Load a parquet file and write the feature values to the online store. Args: event (dict): payload containing the following keys: FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config view_name: Name of FeatureView to be materialized view_ty
(event, context)
| 15 | |
| 16 | |
| 17 | def handler(event, context): |
| 18 | """Load a parquet file and write the feature values to the online store. |
| 19 | |
| 20 | Args: |
| 21 | event (dict): payload containing the following keys: |
| 22 | FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config |
| 23 | view_name: Name of FeatureView to be materialized |
| 24 | view_type: Type of FeatureView |
| 25 | path: Path to parquet batch file on S3 bucket |
| 26 | context (dict): Lambda runtime context, not used. |
| 27 | """ |
| 28 | logger.info(f"Received event: {event}") |
| 29 | |
| 30 | try: |
| 31 | config_base64 = event[FEATURE_STORE_YAML_ENV_NAME] |
| 32 | |
| 33 | config_bytes = base64.b64decode(config_base64) |
| 34 | |
| 35 | # Create a new unique directory for writing feature_store.yaml |
| 36 | with tempfile.TemporaryDirectory() as repo_posix_path: |
| 37 | repo_path = Path(repo_posix_path) |
| 38 | |
| 39 | with open(repo_path / "feature_store.yaml", "wb") as f: |
| 40 | f.write(config_bytes) |
| 41 | |
| 42 | # Initialize the feature store |
| 43 | store = FeatureStore(repo_path=str(repo_path.resolve())) |
| 44 | |
| 45 | view_name = event["view_name"] |
| 46 | view_type = event["view_type"] |
| 47 | path = event["path"] |
| 48 | |
| 49 | bucket, key = path[len("s3://") :].split("/", 1) |
| 50 | logger.info(f"Inferred Bucket: `{bucket}` Key: `{key}`") |
| 51 | |
| 52 | if view_type == "batch": |
| 53 | # TODO: This probably needs to be become `store.get_batch_feature_view` at some point. # noqa: E501,W505 |
| 54 | feature_view = store.get_feature_view(view_name) |
| 55 | else: |
| 56 | feature_view = store.get_stream_feature_view(view_name) |
| 57 | |
| 58 | logger.info( |
| 59 | f"Got Feature View: `{feature_view.name}`, \ |
| 60 | last updated: {feature_view.last_updated_timestamp}" |
| 61 | ) |
| 62 | |
| 63 | table = pq.read_table(path) |
| 64 | if feature_view.batch_source.field_mapping is not None: |
| 65 | table = _run_pyarrow_field_mapping( |
| 66 | table, feature_view.batch_source.field_mapping |
| 67 | ) |
| 68 | |
| 69 | join_key_to_value_type = { |
| 70 | entity.name: entity.dtype.to_value_type() |
| 71 | for entity in feature_view.entity_columns |
| 72 | } |
| 73 | |
| 74 | written_rows = 0 |
no test coverage detected