A cache that works with both sync and async fetch functions. The purpose of this data structure is to cache the result of a function call usually used to fetch a value from a cloud object store. The idea is this: - Cloud operations are expensive - In LoRA specifically, we would
| 344 | |
| 345 | |
| 346 | class CloudObjectCache: |
| 347 | """A cache that works with both sync and async fetch functions. |
| 348 | |
| 349 | The purpose of this data structure is to cache the result of a function call |
| 350 | usually used to fetch a value from a cloud object store. |
| 351 | |
| 352 | The idea is this: |
| 353 | - Cloud operations are expensive |
| 354 | - In LoRA specifically, we would fetch remote storage to download the model weights |
| 355 | at each request. |
| 356 | - If the same model is requested many times, we don't want to inflate the time to first token. |
| 357 | - We control the cache via not only the least recently used eviction policy, but also |
| 358 | by expiring cache entries after a certain time. |
| 359 | - If the object is missing, we cache the missing status for a small duration while if |
| 360 | the object exists, we cache the object for a longer duration. |
| 361 | """ |
| 362 | |
| 363 | def __init__( |
| 364 | self, |
| 365 | max_size: int, |
| 366 | fetch_fn: Union[Callable[[str], Any], Callable[[str], Awaitable[Any]]], |
| 367 | missing_expire_seconds: Optional[int] = None, |
| 368 | exists_expire_seconds: Optional[int] = None, |
| 369 | missing_object_value: Any = object(), |
| 370 | ): |
| 371 | """Initialize the cache. |
| 372 | |
| 373 | Args: |
| 374 | max_size: Maximum number of items to store in cache |
| 375 | fetch_fn: Function to fetch values (can be sync or async) |
| 376 | missing_expire_seconds: How long to cache missing objects (None for no expiration) |
| 377 | exists_expire_seconds: How long to cache existing objects (None for no expiration) |
| 378 | missing_object_value: Sentinel value used to represent a missing object in the cache. |
| 379 | """ |
| 380 | self._cache: Dict[str, _CacheEntry] = {} |
| 381 | self._max_size = max_size |
| 382 | self._fetch_fn = fetch_fn |
| 383 | self._missing_expire_seconds = missing_expire_seconds |
| 384 | self._exists_expire_seconds = exists_expire_seconds |
| 385 | self._is_async = inspect.iscoroutinefunction(fetch_fn) or ( |
| 386 | callable(fetch_fn) and inspect.iscoroutinefunction(fetch_fn.__call__) |
| 387 | ) |
| 388 | self._missing_object_value = missing_object_value |
| 389 | # Lock for thread-safe cache access |
| 390 | self._lock = asyncio.Lock() |
| 391 | |
| 392 | async def aget(self, key: str) -> Any: |
| 393 | """Async get value from cache or fetch it if needed.""" |
| 394 | |
| 395 | if not self._is_async: |
| 396 | raise ValueError("Cannot use async get() with sync fetch function") |
| 397 | |
| 398 | async with self._lock: |
| 399 | value, should_fetch = self._check_cache(key) |
| 400 | if not should_fetch: |
| 401 | return value |
| 402 | |
| 403 | # Fetch new value |
no outgoing calls
searching dependent graphs…