| 57 | |
| 58 | |
| 59 | class MonitoringService: |
| 60 | def __init__(self, store: "FeatureStore"): |
| 61 | self._store = store |
| 62 | self._job_manager: Optional[DQMJobManager] = None |
| 63 | self._calculator = MetricsCalculator() |
| 64 | self._monitoring_tables_ensured = False |
| 65 | self._offline_store_cache = None |
| 66 | |
| 67 | def _get_offline_store(self): |
| 68 | if self._offline_store_cache is None: |
| 69 | self._offline_store_cache = self._store._get_provider().offline_store |
| 70 | return self._offline_store_cache |
| 71 | |
| 72 | def _ensure_monitoring_tables(self): |
| 73 | if not self._monitoring_tables_ensured: |
| 74 | self._get_offline_store().ensure_monitoring_tables(self._store.config) |
| 75 | self._monitoring_tables_ensured = True |
| 76 | |
| 77 | @property |
| 78 | def job_manager(self) -> DQMJobManager: |
| 79 | if self._job_manager is None: |
| 80 | self._job_manager = DQMJobManager( |
| 81 | self._get_offline_store(), self._store.config |
| 82 | ) |
| 83 | self._job_manager.ensure_table() |
| 84 | return self._job_manager |
| 85 | |
| 86 | # ------------------------------------------------------------------ # |
| 87 | # Auto-compute: detect dates, compute all granularities |
| 88 | # ------------------------------------------------------------------ # |
| 89 | |
| 90 | def auto_compute( |
| 91 | self, |
| 92 | project: Optional[str] = None, |
| 93 | feature_view_name: Optional[str] = None, |
| 94 | ) -> Dict[str, Any]: |
| 95 | """Detect date ranges from source data and compute all granularities.""" |
| 96 | start_time = time.time() |
| 97 | self._ensure_monitoring_tables() |
| 98 | if project is None: |
| 99 | project = self._store.config.project |
| 100 | |
| 101 | feature_views = self._resolve_feature_views(project, feature_view_name) |
| 102 | total_features = 0 |
| 103 | total_views = 0 |
| 104 | granularities_computed = set() |
| 105 | |
| 106 | for fv in feature_views: |
| 107 | try: |
| 108 | feature_fields = self._classify_fields(fv) |
| 109 | if not feature_fields: |
| 110 | continue |
| 111 | |
| 112 | max_ts = self._get_max_timestamp(fv) |
| 113 | if max_ts is None: |
| 114 | logger.warning( |
| 115 | "No data found for feature view '%s', skipping", fv.name |
| 116 | ) |
no outgoing calls