MCPcopy
hub / github.com/feast-dev/feast / handler

Function handler

sdk/python/feast/infra/compute_engines/aws_lambda/app.py:17–93  ·  view source on GitHub ↗

Load a parquet file and write the feature values to the online store. Args: event (dict): payload containing the following keys: FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config view_name: Name of FeatureView to be materialized view_ty

(event, context)

Source from the content-addressed store, hash-verified

15
16
17def handler(event, context):
18 """Load a parquet file and write the feature values to the online store.
19
20 Args:
21 event (dict): payload containing the following keys:
22 FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config
23 view_name: Name of FeatureView to be materialized
24 view_type: Type of FeatureView
25 path: Path to parquet batch file on S3 bucket
26 context (dict): Lambda runtime context, not used.
27 """
28 logger.info(f"Received event: {event}")
29
30 try:
31 config_base64 = event[FEATURE_STORE_YAML_ENV_NAME]
32
33 config_bytes = base64.b64decode(config_base64)
34
35 # Create a new unique directory for writing feature_store.yaml
36 with tempfile.TemporaryDirectory() as repo_posix_path:
37 repo_path = Path(repo_posix_path)
38
39 with open(repo_path / "feature_store.yaml", "wb") as f:
40 f.write(config_bytes)
41
42 # Initialize the feature store
43 store = FeatureStore(repo_path=str(repo_path.resolve()))
44
45 view_name = event["view_name"]
46 view_type = event["view_type"]
47 path = event["path"]
48
49 bucket, key = path[len("s3://") :].split("/", 1)
50 logger.info(f"Inferred Bucket: `{bucket}` Key: `{key}`")
51
52 if view_type == "batch":
53 # TODO: This probably needs to be become `store.get_batch_feature_view` at some point. # noqa: E501,W505
54 feature_view = store.get_feature_view(view_name)
55 else:
56 feature_view = store.get_stream_feature_view(view_name)
57
58 logger.info(
59 f"Got Feature View: `{feature_view.name}`, \
60 last updated: {feature_view.last_updated_timestamp}"
61 )
62
63 table = pq.read_table(path)
64 if feature_view.batch_source.field_mapping is not None:
65 table = _run_pyarrow_field_mapping(
66 table, feature_view.batch_source.field_mapping
67 )
68
69 join_key_to_value_type = {
70 entity.name: entity.dtype.to_value_type()
71 for entity in feature_view.entity_columns
72 }
73
74 written_rows = 0

Callers 2

embedMethod · 0.85
_run_agent_llmFunction · 0.85

Calls 9

get_feature_viewMethod · 0.95
FeatureStoreClass · 0.90
_convert_arrow_to_protoFunction · 0.90
exceptionMethod · 0.80
resolveMethod · 0.45
to_value_typeMethod · 0.45
online_write_batchMethod · 0.45

Tested by

no test coverage detected