| 21 | |
| 22 | |
| 23 | class LocalDiskCache(CacheBase): |
| 24 | def __init__(self, path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=True, **settings): |
| 25 | """LocalDiskCache is an adapter to a diskcache implementation. |
| 26 | |
| 27 | LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local |
| 28 | file system storage. |
| 29 | |
| 30 | :param path: Path where the dataset cache is being stored. |
| 31 | :param size_limit_bytes: Maximal size of the disk-space to be used by cache. The size of the cache may actually |
| 32 | grow somewhat above the size_limit_bytes, so the limit is not very strict. |
| 33 | :param expected_row_size_bytes: Approximate size of a single row. This argument is used to perform a sanity |
| 34 | check on the capacity of individual shards. |
| 35 | :param shards: Cache can be sharded. Larger number of shards improve writing parallelism. |
| 36 | :param cleanup: If set to True, cache directory would be removed when cleanup() method is called. |
| 37 | :param settings: these parameters passed-through to the diskcache.Cache class. |
| 38 | For details, see: http://www.grantjenks.com/docs/diskcache/tutorial.html#settings |
| 39 | """ |
| 40 | default_settings = { |
| 41 | 'size_limit': size_limit_bytes, |
| 42 | 'eviction_policy': 'least-recently-stored', |
| 43 | 'disk_pickle_protocol': pickle.HIGHEST_PROTOCOL, |
| 44 | } |
| 45 | default_settings.update(settings) |
| 46 | |
| 47 | if default_settings['eviction_policy'] != 'none' and size_limit_bytes / shards < 5 * expected_row_size_bytes: |
| 48 | raise ValueError('Condition \'size_limit_bytes / shards < 5 * expected_row_size_bytes\' needs to hold, ' |
| 49 | 'otherwise, newly added cached values might end up being immediately evicted.') |
| 50 | |
| 51 | self._cleanup = cleanup |
| 52 | self._path = path |
| 53 | self._size_limit_bytes = size_limit_bytes |
| 54 | self._default_settings = default_settings |
| 55 | self._cache = FanoutCache(path, shards, **default_settings) |
| 56 | |
| 57 | def get(self, key, fill_cache_func): |
| 58 | value = self._cache.get(key, default=None) |
| 59 | if value is None: |
| 60 | value = fill_cache_func() |
| 61 | # If eviction policy is set to 'none', we don't store the value if the cache is full |
| 62 | if self._default_settings['eviction_policy'] == 'none': |
| 63 | if self._cache.volume() < self._size_limit_bytes: |
| 64 | self._cache.set(key, value) |
| 65 | else: # evict and store the value |
| 66 | self._cache.set(key, value) |
| 67 | return value |
| 68 | |
| 69 | def cleanup(self): |
| 70 | if self._cleanup: |
| 71 | try: |
| 72 | # Very important to stop background threads |
| 73 | self._cache.close() |
| 74 | except (OSError, AttributeError) as e: |
| 75 | print(f"Error closing cache: {e}", flush=True) |
| 76 | try: |
| 77 | shutil.rmtree(self._path, ignore_errors=False) |
| 78 | except FileNotFoundError: |
| 79 | # OK: it's already gone |
| 80 | pass |
no outgoing calls
searching dependent graphs…