MCPcopy
hub / github.com/feast-dev/feast / run_demo

Function run_demo

sdk/python/feast/templates/spark/feature_repo/test_workflow.py:10–52  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

8
9
10def run_demo():
11 store = FeatureStore(repo_path=".")
12 print("\n--- Run feast apply to setup feature store on Snowflake ---")
13 subprocess.run(["feast", "apply"])
14
15 print("\n--- Historical features for training ---")
16 fetch_historical_features_entity_df(store, for_batch_scoring=False)
17
18 print("\n--- Historical features for batch scoring ---")
19 fetch_historical_features_entity_df(store, for_batch_scoring=True)
20
21 print("\n--- Load features into online store ---")
22 store.materialize_incremental(end_date=datetime.now())
23
24 print("\n--- Online features ---")
25 fetch_online_features(store, use_feature_service=False)
26
27 print("\n--- Online features retrieved (instead) through a feature service---")
28 fetch_online_features(store, use_feature_service=True)
29
30 print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
31 event_df = pd.DataFrame.from_dict(
32 {
33 "driver_id": [1001],
34 "event_timestamp": [
35 datetime(2021, 5, 13, 10, 59, 42),
36 ],
37 "created": [
38 datetime(2021, 5, 13, 10, 59, 42),
39 ],
40 "conv_rate": [1.0],
41 "acc_rate": [1.0],
42 "avg_daily_trips": [1000],
43 }
44 )
45 print(event_df)
46 store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE)
47
48 print("\n--- Online features again with updated values from a stream push---")
49 fetch_online_features(store, use_feature_service=True)
50
51 print("\n--- Run feast teardown ---")
52 subprocess.run(["feast", "teardown"])
53
54
55def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):

Callers 1

test_workflow.pyFile · 0.70

Calls 7

pushMethod · 0.95
FeatureStoreClass · 0.90
fetch_online_featuresFunction · 0.70
runMethod · 0.45
from_dictMethod · 0.45

Tested by

no test coverage detected