Webhook endpoint for external annotation tools (Argilla, Label Studio) to push labels.
(request: WebhookPushRequest)
| 623 | |
| 624 | @rest_app.post("/webhook/label-ingest") |
| 625 | def webhook_label_ingest(request: WebhookPushRequest): |
| 626 | """Webhook endpoint for external annotation tools (Argilla, Label Studio) to push labels.""" |
| 627 | try: |
| 628 | from datetime import datetime, timezone |
| 629 | |
| 630 | for record in request.records: |
| 631 | if "event_timestamp" not in record: |
| 632 | record["event_timestamp"] = datetime.now(timezone.utc).isoformat() |
| 633 | |
| 634 | df = pd.DataFrame(request.records) |
| 635 | store.push( |
| 636 | request.push_source_name, |
| 637 | df, |
| 638 | to=feast.data_source.PushMode.ONLINE_AND_OFFLINE, |
| 639 | ) |
| 640 | return { |
| 641 | "status": "ok", |
| 642 | "records_ingested": len(request.records), |
| 643 | "push_source": request.push_source_name, |
| 644 | } |
| 645 | except Exception: |
| 646 | logger.exception("Webhook label ingest failed") |
| 647 | return _safe_error_response("Webhook label ingest") |
| 648 | |
| 649 | class TrainingExportRequest(BaseModel): |
| 650 | feature_service: str |
nothing calls this directly
no test coverage detected