(
self,
feature_view: Union[FeatureView, OnDemandFeatureView, LabelView],
project: str,
start_date: datetime,
end_date: datetime,
commit: bool = True,
)
| 1006 | return proto_registry_utils.get_data_source(registry_proto, name, project) |
| 1007 | |
| 1008 | def apply_materialization( |
| 1009 | self, |
| 1010 | feature_view: Union[FeatureView, OnDemandFeatureView, LabelView], |
| 1011 | project: str, |
| 1012 | start_date: datetime, |
| 1013 | end_date: datetime, |
| 1014 | commit: bool = True, |
| 1015 | ): |
| 1016 | if isinstance(feature_view, LabelView): |
| 1017 | raise ValueError( |
| 1018 | f"Cannot apply materialization for LabelView {feature_view.name}. " |
| 1019 | f"Use FeatureStore.push() to write labels." |
| 1020 | ) |
| 1021 | |
| 1022 | self._prepare_registry_for_changes(project) |
| 1023 | assert self.cached_registry_proto |
| 1024 | |
| 1025 | for idx, existing_feature_view_proto in enumerate( |
| 1026 | self.cached_registry_proto.feature_views |
| 1027 | ): |
| 1028 | if ( |
| 1029 | existing_feature_view_proto.spec.name == feature_view.name |
| 1030 | and existing_feature_view_proto.spec.project == project |
| 1031 | ): |
| 1032 | existing_feature_view = FeatureView.from_proto( |
| 1033 | existing_feature_view_proto |
| 1034 | ) |
| 1035 | existing_feature_view.materialization_intervals.append( |
| 1036 | (start_date, end_date) |
| 1037 | ) |
| 1038 | existing_feature_view.last_updated_timestamp = _utc_now() |
| 1039 | # Transition state to AVAILABLE_ONLINE after materialization. |
| 1040 | if hasattr(existing_feature_view, "state"): |
| 1041 | existing_feature_view.state = FeatureViewState.AVAILABLE_ONLINE |
| 1042 | feature_view_proto = existing_feature_view.to_proto() |
| 1043 | feature_view_proto.spec.project = project |
| 1044 | del self.cached_registry_proto.feature_views[idx] |
| 1045 | self.cached_registry_proto.feature_views.append(feature_view_proto) |
| 1046 | if commit: |
| 1047 | self.commit() |
| 1048 | return |
| 1049 | |
| 1050 | for idx, existing_stream_feature_view_proto in enumerate( |
| 1051 | self.cached_registry_proto.stream_feature_views |
| 1052 | ): |
| 1053 | if ( |
| 1054 | existing_stream_feature_view_proto.spec.name == feature_view.name |
| 1055 | and existing_stream_feature_view_proto.spec.project == project |
| 1056 | ): |
| 1057 | existing_stream_feature_view = StreamFeatureView.from_proto( |
| 1058 | existing_stream_feature_view_proto |
| 1059 | ) |
| 1060 | existing_stream_feature_view.materialization_intervals.append( |
| 1061 | (start_date, end_date) |
| 1062 | ) |
| 1063 | existing_stream_feature_view.last_updated_timestamp = _utc_now() |
| 1064 | # Transition state to AVAILABLE_ONLINE after materialization. |
| 1065 | if hasattr(existing_stream_feature_view, "state"): |
nothing calls this directly
no test coverage detected