Compute and set the freshness gauge for every feature view in the registry. Freshness = now - most_recent_end_time (from materialization_intervals). A higher value means the feature data is more stale.
(
store: "FeatureStore",
)
| 489 | |
| 490 | |
| 491 | def update_feature_freshness( |
| 492 | store: "FeatureStore", |
| 493 | ) -> None: |
| 494 | """ |
| 495 | Compute and set the freshness gauge for every feature view in the registry. |
| 496 | |
| 497 | Freshness = now - most_recent_end_time (from materialization_intervals). |
| 498 | A higher value means the feature data is more stale. |
| 499 | """ |
| 500 | try: |
| 501 | feature_views = store.list_feature_views(allow_cache=True) |
| 502 | stream_feature_views = store.list_stream_feature_views(allow_cache=True) |
| 503 | all_views = feature_views + stream_feature_views |
| 504 | now = datetime.now(tz=timezone.utc) |
| 505 | for fv in all_views: |
| 506 | end_time = fv.most_recent_end_time |
| 507 | if end_time is not None: |
| 508 | if end_time.tzinfo is None: |
| 509 | end_time = end_time.replace(tzinfo=timezone.utc) |
| 510 | staleness = (now - end_time).total_seconds() |
| 511 | feature_freshness_seconds.labels( |
| 512 | feature_view=fv.name, project=store.project |
| 513 | ).set(staleness) |
| 514 | except Exception: |
| 515 | logger.debug("Failed to update feature freshness metrics", exc_info=True) |
| 516 | |
| 517 | |
| 518 | def monitor_resources(interval: int = 5): |