MCPcopy Index your code
hub / github.com/feast-dev/feast / push

Function push

sdk/python/feast/feature_server.py:499–594  ·  view source on GitHub ↗
(request: PushFeaturesRequest)

Source from the content-addressed store, hash-verified

497
498 @app.post("/push", dependencies=[Depends(inject_user_details)])
499 async def push(request: PushFeaturesRequest) -> Response:
500 with feast_metrics.track_request_latency("/push"):
501 df = pd.DataFrame(request.df)
502 actions = []
503 if request.to == "offline":
504 to = PushMode.OFFLINE
505 actions = [AuthzedAction.WRITE_OFFLINE]
506 elif request.to == "online":
507 to = PushMode.ONLINE
508 actions = [AuthzedAction.WRITE_ONLINE]
509 elif request.to == "online_and_offline":
510 to = PushMode.ONLINE_AND_OFFLINE
511 actions = WRITE
512 else:
513 raise ValueError(
514 f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
515 )
516
517 from feast.data_source import PushSource
518
519 all_fvs = store.list_feature_views(
520 allow_cache=request.allow_registry_cache
521 ) + store.list_stream_feature_views(
522 allow_cache=request.allow_registry_cache
523 )
524 fvs_with_push_sources = {
525 fv
526 for fv in all_fvs
527 if (
528 fv.stream_source is not None
529 and isinstance(fv.stream_source, PushSource)
530 and fv.stream_source.name == request.push_source_name
531 )
532 }
533
534 for feature_view in fvs_with_push_sources:
535 assert_permissions(resource=feature_view, actions=actions)
536
537 async def _push_with_to(push_to: PushMode) -> None:
538 """
539 Helper for performing a single push operation.
540
541 NOTE:
542 - Feast providers **do not currently support async offline writes**.
543 - Therefore:
544 * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write
545 * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers.
546 - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only.
547 """
548 push_source_name = request.push_source_name
549 allow_registry_cache = request.allow_registry_cache
550 transform_on_write = request.transform_on_write
551
552 # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store
553 if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and (
554 store._get_provider().async_supported.online.write
555 ):
556 await store.push_async(

Callers

nothing calls this directly

Calls 5

assert_permissionsFunction · 0.90
_push_with_toFunction · 0.85
enqueueMethod · 0.80
list_feature_viewsMethod · 0.45

Tested by

no test coverage detected