MCPcopy
hub / github.com/ray-project/ray / CloudObjectCache

Class CloudObjectCache

python/ray/llm/_internal/common/utils/cloud_utils.py:346–477  ·  view source on GitHub ↗

A cache that works with both sync and async fetch functions. The purpose of this data structure is to cache the result of a function call usually used to fetch a value from a cloud object store. The idea is this: - Cloud operations are expensive - In LoRA specifically, we would

Source from the content-addressed store, hash-verified

344
345
346class CloudObjectCache:
347 """A cache that works with both sync and async fetch functions.
348
349 The purpose of this data structure is to cache the result of a function call
350 usually used to fetch a value from a cloud object store.
351
352 The idea is this:
353 - Cloud operations are expensive
354 - In LoRA specifically, we would fetch remote storage to download the model weights
355 at each request.
356 - If the same model is requested many times, we don't want to inflate the time to first token.
357 - We control the cache via not only the least recently used eviction policy, but also
358 by expiring cache entries after a certain time.
359 - If the object is missing, we cache the missing status for a small duration while if
360 the object exists, we cache the object for a longer duration.
361 """
362
363 def __init__(
364 self,
365 max_size: int,
366 fetch_fn: Union[Callable[[str], Any], Callable[[str], Awaitable[Any]]],
367 missing_expire_seconds: Optional[int] = None,
368 exists_expire_seconds: Optional[int] = None,
369 missing_object_value: Any = object(),
370 ):
371 """Initialize the cache.
372
373 Args:
374 max_size: Maximum number of items to store in cache
375 fetch_fn: Function to fetch values (can be sync or async)
376 missing_expire_seconds: How long to cache missing objects (None for no expiration)
377 exists_expire_seconds: How long to cache existing objects (None for no expiration)
378 missing_object_value: Sentinel value used to represent a missing object in the cache.
379 """
380 self._cache: Dict[str, _CacheEntry] = {}
381 self._max_size = max_size
382 self._fetch_fn = fetch_fn
383 self._missing_expire_seconds = missing_expire_seconds
384 self._exists_expire_seconds = exists_expire_seconds
385 self._is_async = inspect.iscoroutinefunction(fetch_fn) or (
386 callable(fetch_fn) and inspect.iscoroutinefunction(fetch_fn.__call__)
387 )
388 self._missing_object_value = missing_object_value
389 # Lock for thread-safe cache access
390 self._lock = asyncio.Lock()
391
392 async def aget(self, key: str) -> Any:
393 """Async get value from cache or fetch it if needed."""
394
395 if not self._is_async:
396 raise ValueError("Cannot use async get() with sync fetch function")
397
398 async with self._lock:
399 value, should_fetch = self._check_cache(key)
400 if not should_fetch:
401 return value
402
403 # Fetch new value

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…