Return the output of _get_online_request_context plus resolved ODFV entity join keys, using a cache that is invalidated whenever the registry refreshes or its cache TTL expires.
(
registry,
project: str,
features: Union[List[str], "FeatureService"],
full_feature_names: bool,
)
| 1390 | |
| 1391 | |
| 1392 | def _get_cached_request_context( |
| 1393 | registry, |
| 1394 | project: str, |
| 1395 | features: Union[List[str], "FeatureService"], |
| 1396 | full_feature_names: bool, |
| 1397 | ): |
| 1398 | """Return the output of _get_online_request_context plus resolved ODFV |
| 1399 | entity join keys, using a cache that is invalidated whenever the |
| 1400 | registry refreshes or its cache TTL expires.""" |
| 1401 | from feast.feature_service import FeatureService as _FS |
| 1402 | |
| 1403 | global _feature_resolution_cache, _feature_resolution_registry_ts |
| 1404 | |
| 1405 | registry_ts = getattr(registry, "cached_registry_proto_created", None) |
| 1406 | |
| 1407 | if isinstance(features, _FS): |
| 1408 | features_key: tuple = ("__fs__", features.name) |
| 1409 | else: |
| 1410 | features_key = tuple(features) |
| 1411 | |
| 1412 | cache_key = (features_key, project, full_feature_names) |
| 1413 | |
| 1414 | is_cache_valid = getattr(registry, "is_cache_valid", None) |
| 1415 | registry_cache_valid = is_cache_valid() if callable(is_cache_valid) else False |
| 1416 | |
| 1417 | if registry_ts is not None and registry_cache_valid: |
| 1418 | with _feature_resolution_cache_lock: |
| 1419 | if registry_ts != _feature_resolution_registry_ts: |
| 1420 | _feature_resolution_cache.clear() |
| 1421 | _feature_resolution_registry_ts = registry_ts |
| 1422 | _logger.debug("Feature resolution cache cleared (registry refreshed)") |
| 1423 | else: |
| 1424 | cached = _feature_resolution_cache.get(cache_key) |
| 1425 | if cached is not None: |
| 1426 | return cached |
| 1427 | |
| 1428 | ctx = _get_online_request_context(registry, project, features, full_feature_names) |
| 1429 | |
| 1430 | ( |
| 1431 | feature_refs, |
| 1432 | requested_on_demand_feature_views, |
| 1433 | entity_name_to_join_key_map, |
| 1434 | entity_type_map, |
| 1435 | join_keys_set, |
| 1436 | grouped_refs, |
| 1437 | requested_result_row_names, |
| 1438 | needed_request_data, |
| 1439 | entityless_case, |
| 1440 | ) = ctx |
| 1441 | |
| 1442 | odfv_join_keys: set = set() |
| 1443 | for on_demand_feature_view in requested_on_demand_feature_views: |
| 1444 | entities_for_odfv = getattr(on_demand_feature_view, "entities", []) |
| 1445 | if len(entities_for_odfv) > 0 and isinstance(entities_for_odfv[0], str): |
| 1446 | entities_for_odfv = [ |
| 1447 | registry.get_entity(entity_name, project, allow_cache=True) |
| 1448 | for entity_name in entities_for_odfv |
| 1449 | ] |
no test coverage detected