Compute label quality metrics with conflict policy enforcement.
(request: LabelQualityRequest)
| 274 | |
| 275 | @rest_app.post("/label-quality") |
| 276 | def label_quality(request: LabelQualityRequest): |
| 277 | """Compute label quality metrics with conflict policy enforcement.""" |
| 278 | try: |
| 279 | from datetime import datetime, timezone |
| 280 | |
| 281 | from feast.labeling.conflict_policy import ConflictPolicy |
| 282 | from feast.labeling.conflict_resolver import resolve_conflicts |
| 283 | |
| 284 | fv_name = request.feature_view |
| 285 | fv: Any = None |
| 286 | try: |
| 287 | fv = store.registry.get_label_view(fv_name, store.project) |
| 288 | except Exception: |
| 289 | try: |
| 290 | fv = store.get_feature_view(fv_name) |
| 291 | except Exception: |
| 292 | pass |
| 293 | |
| 294 | if fv is None: |
| 295 | return Response( |
| 296 | content=json.dumps({"detail": f"Label view '{fv_name}' not found"}), |
| 297 | status_code=status.HTTP_404_NOT_FOUND, |
| 298 | media_type="application/json", |
| 299 | ) |
| 300 | |
| 301 | batch_source = getattr(fv, "batch_source", None) |
| 302 | if batch_source is None: |
| 303 | return Response( |
| 304 | content=json.dumps({"detail": f"No batch source for '{fv_name}'"}), |
| 305 | status_code=status.HTTP_400_BAD_REQUEST, |
| 306 | media_type="application/json", |
| 307 | ) |
| 308 | |
| 309 | feature_names = [f.name for f in fv.features] |
| 310 | join_keys = ( |
| 311 | fv.join_keys |
| 312 | if hasattr(fv, "join_keys") |
| 313 | else (fv.entities if fv.entities else []) |
| 314 | ) |
| 315 | |
| 316 | provider = store._get_provider() |
| 317 | timestamp_field = batch_source.timestamp_field |
| 318 | labeler_field = getattr(fv, "labeler_field", "labeler") |
| 319 | conflict_policy = getattr( |
| 320 | fv, "conflict_policy", ConflictPolicy.LAST_WRITE_WINS |
| 321 | ) |
| 322 | |
| 323 | try: |
| 324 | job = provider.offline_store.pull_all_from_table_or_query( |
| 325 | config=store.config, |
| 326 | data_source=batch_source, |
| 327 | join_key_columns=join_keys, |
| 328 | feature_name_columns=feature_names, |
| 329 | timestamp_field=timestamp_field, |
| 330 | ) |
| 331 | raw_df = job.to_df() |
| 332 | except Exception: |
| 333 | return { |
nothing calls this directly
no test coverage detected