MCPcopy
hub / github.com/uber/petastorm / LocalDiskCache

Class LocalDiskCache

petastorm/local_disk_cache.py:23–82  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

21
22
23class LocalDiskCache(CacheBase):
24 def __init__(self, path, size_limit_bytes, expected_row_size_bytes, shards=6, cleanup=True, **settings):
25 """LocalDiskCache is an adapter to a diskcache implementation.
26
27 LocalDiskCache can be used by a petastorm Reader class to temporarily keep parts of the dataset on a local
28 file system storage.
29
30 :param path: Path where the dataset cache is being stored.
31 :param size_limit_bytes: Maximal size of the disk-space to be used by cache. The size of the cache may actually
32 grow somewhat above the size_limit_bytes, so the limit is not very strict.
33 :param expected_row_size_bytes: Approximate size of a single row. This argument is used to perform a sanity
34 check on the capacity of individual shards.
35 :param shards: Cache can be sharded. Larger number of shards improve writing parallelism.
36 :param cleanup: If set to True, cache directory would be removed when cleanup() method is called.
37 :param settings: these parameters passed-through to the diskcache.Cache class.
38 For details, see: http://www.grantjenks.com/docs/diskcache/tutorial.html#settings
39 """
40 default_settings = {
41 'size_limit': size_limit_bytes,
42 'eviction_policy': 'least-recently-stored',
43 'disk_pickle_protocol': pickle.HIGHEST_PROTOCOL,
44 }
45 default_settings.update(settings)
46
47 if default_settings['eviction_policy'] != 'none' and size_limit_bytes / shards < 5 * expected_row_size_bytes:
48 raise ValueError('Condition \'size_limit_bytes / shards < 5 * expected_row_size_bytes\' needs to hold, '
49 'otherwise, newly added cached values might end up being immediately evicted.')
50
51 self._cleanup = cleanup
52 self._path = path
53 self._size_limit_bytes = size_limit_bytes
54 self._default_settings = default_settings
55 self._cache = FanoutCache(path, shards, **default_settings)
56
57 def get(self, key, fill_cache_func):
58 value = self._cache.get(key, default=None)
59 if value is None:
60 value = fill_cache_func()
61 # If eviction policy is set to 'none', we don't store the value if the cache is full
62 if self._default_settings['eviction_policy'] == 'none':
63 if self._cache.volume() < self._size_limit_bytes:
64 self._cache.set(key, value)
65 else: # evict and store the value
66 self._cache.set(key, value)
67 return value
68
69 def cleanup(self):
70 if self._cleanup:
71 try:
72 # Very important to stop background threads
73 self._cache.close()
74 except (OSError, AttributeError) as e:
75 print(f"Error closing cache: {e}", flush=True)
76 try:
77 shutil.rmtree(self._path, ignore_errors=False)
78 except FileNotFoundError:
79 # OK: it's already gone
80 pass

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…