MCPcopy
hub / github.com/feast-dev/feast / apply_materialization

Method apply_materialization

sdk/python/feast/infra/registry/registry.py:1008–1077  ·  view source on GitHub ↗
(
        self,
        feature_view: Union[FeatureView, OnDemandFeatureView, LabelView],
        project: str,
        start_date: datetime,
        end_date: datetime,
        commit: bool = True,
    )

Source from the content-addressed store, hash-verified

1006 return proto_registry_utils.get_data_source(registry_proto, name, project)
1007
1008 def apply_materialization(
1009 self,
1010 feature_view: Union[FeatureView, OnDemandFeatureView, LabelView],
1011 project: str,
1012 start_date: datetime,
1013 end_date: datetime,
1014 commit: bool = True,
1015 ):
1016 if isinstance(feature_view, LabelView):
1017 raise ValueError(
1018 f"Cannot apply materialization for LabelView {feature_view.name}. "
1019 f"Use FeatureStore.push() to write labels."
1020 )
1021
1022 self._prepare_registry_for_changes(project)
1023 assert self.cached_registry_proto
1024
1025 for idx, existing_feature_view_proto in enumerate(
1026 self.cached_registry_proto.feature_views
1027 ):
1028 if (
1029 existing_feature_view_proto.spec.name == feature_view.name
1030 and existing_feature_view_proto.spec.project == project
1031 ):
1032 existing_feature_view = FeatureView.from_proto(
1033 existing_feature_view_proto
1034 )
1035 existing_feature_view.materialization_intervals.append(
1036 (start_date, end_date)
1037 )
1038 existing_feature_view.last_updated_timestamp = _utc_now()
1039 # Transition state to AVAILABLE_ONLINE after materialization.
1040 if hasattr(existing_feature_view, "state"):
1041 existing_feature_view.state = FeatureViewState.AVAILABLE_ONLINE
1042 feature_view_proto = existing_feature_view.to_proto()
1043 feature_view_proto.spec.project = project
1044 del self.cached_registry_proto.feature_views[idx]
1045 self.cached_registry_proto.feature_views.append(feature_view_proto)
1046 if commit:
1047 self.commit()
1048 return
1049
1050 for idx, existing_stream_feature_view_proto in enumerate(
1051 self.cached_registry_proto.stream_feature_views
1052 ):
1053 if (
1054 existing_stream_feature_view_proto.spec.name == feature_view.name
1055 and existing_stream_feature_view_proto.spec.project == project
1056 ):
1057 existing_stream_feature_view = StreamFeatureView.from_proto(
1058 existing_stream_feature_view_proto
1059 )
1060 existing_stream_feature_view.materialization_intervals.append(
1061 (start_date, end_date)
1062 )
1063 existing_stream_feature_view.last_updated_timestamp = _utc_now()
1064 # Transition state to AVAILABLE_ONLINE after materialization.
1065 if hasattr(existing_stream_feature_view, "state"):

Callers

nothing calls this directly

Calls 5

commitMethod · 0.95
_utc_nowFunction · 0.90
from_protoMethod · 0.45
to_protoMethod · 0.45

Tested by

no test coverage detected