(request: PushFeaturesRequest)
| 497 | |
| 498 | @app.post("/push", dependencies=[Depends(inject_user_details)]) |
| 499 | async def push(request: PushFeaturesRequest) -> Response: |
| 500 | with feast_metrics.track_request_latency("/push"): |
| 501 | df = pd.DataFrame(request.df) |
| 502 | actions = [] |
| 503 | if request.to == "offline": |
| 504 | to = PushMode.OFFLINE |
| 505 | actions = [AuthzedAction.WRITE_OFFLINE] |
| 506 | elif request.to == "online": |
| 507 | to = PushMode.ONLINE |
| 508 | actions = [AuthzedAction.WRITE_ONLINE] |
| 509 | elif request.to == "online_and_offline": |
| 510 | to = PushMode.ONLINE_AND_OFFLINE |
| 511 | actions = WRITE |
| 512 | else: |
| 513 | raise ValueError( |
| 514 | f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']." |
| 515 | ) |
| 516 | |
| 517 | from feast.data_source import PushSource |
| 518 | |
| 519 | all_fvs = store.list_feature_views( |
| 520 | allow_cache=request.allow_registry_cache |
| 521 | ) + store.list_stream_feature_views( |
| 522 | allow_cache=request.allow_registry_cache |
| 523 | ) |
| 524 | fvs_with_push_sources = { |
| 525 | fv |
| 526 | for fv in all_fvs |
| 527 | if ( |
| 528 | fv.stream_source is not None |
| 529 | and isinstance(fv.stream_source, PushSource) |
| 530 | and fv.stream_source.name == request.push_source_name |
| 531 | ) |
| 532 | } |
| 533 | |
| 534 | for feature_view in fvs_with_push_sources: |
| 535 | assert_permissions(resource=feature_view, actions=actions) |
| 536 | |
| 537 | async def _push_with_to(push_to: PushMode) -> None: |
| 538 | """ |
| 539 | Helper for performing a single push operation. |
| 540 | |
| 541 | NOTE: |
| 542 | - Feast providers **do not currently support async offline writes**. |
| 543 | - Therefore: |
| 544 | * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write |
| 545 | * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers. |
| 546 | - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only. |
| 547 | """ |
| 548 | push_source_name = request.push_source_name |
| 549 | allow_registry_cache = request.allow_registry_cache |
| 550 | transform_on_write = request.transform_on_write |
| 551 | |
| 552 | # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store |
| 553 | if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and ( |
| 554 | store._get_provider().async_supported.online.write |
| 555 | ): |
| 556 | await store.push_async( |
nothing calls this directly
no test coverage detected