Materialize incremental new data from the offline store into the online store. This method loads incremental new feature data up to the specified end time from either the specified feature views, or all feature views if none are specified, into the online store wher
(
self,
end_date: datetime,
feature_views: Optional[List[str]] = None,
full_feature_names: bool = False,
version: Optional[str] = None,
)
| 2080 | self.write_to_online_store(feature_view.name, df=transformed_df) |
| 2081 | |
| 2082 | def materialize_incremental( |
| 2083 | self, |
| 2084 | end_date: datetime, |
| 2085 | feature_views: Optional[List[str]] = None, |
| 2086 | full_feature_names: bool = False, |
| 2087 | version: Optional[str] = None, |
| 2088 | ) -> None: |
| 2089 | """ |
| 2090 | Materialize incremental new data from the offline store into the online store. |
| 2091 | |
| 2092 | This method loads incremental new feature data up to the specified end time from either |
| 2093 | the specified feature views, or all feature views if none are specified, |
| 2094 | into the online store where it is available for online serving. The start time of |
| 2095 | the interval materialized is either the most recent end time of a prior materialization or |
| 2096 | (now - ttl) if no such prior materialization exists. |
| 2097 | |
| 2098 | Args: |
| 2099 | end_date (datetime): End date for time range of data to materialize into the online store |
| 2100 | feature_views (List[str]): Optional list of feature view names. If selected, will only run |
| 2101 | materialization for the specified feature views. |
| 2102 | full_feature_names (bool): If True, feature names will be prefixed with the corresponding |
| 2103 | feature view name. |
| 2104 | version (str): Optional version to materialize (e.g., 'v2'). Requires feature_views |
| 2105 | with exactly one entry and enable_online_feature_view_versioning to be enabled. |
| 2106 | |
| 2107 | Raises: |
| 2108 | Exception: A feature view being materialized does not have a TTL set. |
| 2109 | |
| 2110 | Examples: |
| 2111 | Materialize all features into the online store up to 5 minutes ago. |
| 2112 | |
| 2113 | >>> from feast import FeatureStore, RepoConfig |
| 2114 | >>> from datetime import datetime, timedelta |
| 2115 | >>> fs = FeatureStore(repo_path="project/feature_repo") |
| 2116 | >>> fs.materialize_incremental(end_date=_utc_now() - timedelta(minutes=5)) |
| 2117 | Materializing... |
| 2118 | <BLANKLINE> |
| 2119 | ... |
| 2120 | """ |
| 2121 | parsed_version = self._validate_materialize_version(version, feature_views) |
| 2122 | feature_views_to_materialize = self._get_feature_views_to_materialize( |
| 2123 | feature_views, version=parsed_version |
| 2124 | ) |
| 2125 | _print_materialization_log( |
| 2126 | None, |
| 2127 | end_date, |
| 2128 | len(feature_views_to_materialize), |
| 2129 | self.config.online_store.type, |
| 2130 | ) |
| 2131 | |
| 2132 | # Emit OpenLineage START event for incremental materialization |
| 2133 | ol_run_id = self._emit_openlineage_materialize_start( |
| 2134 | feature_views_to_materialize, None, end_date |
| 2135 | ) |
| 2136 | |
| 2137 | _mat_start = time.monotonic() |
| 2138 | try: |
| 2139 | # TODO paging large loads |