Helper to materialize a single OnDemandFeatureView.
(
self,
feature_view: OnDemandFeatureView,
start_date: datetime,
end_date: datetime,
full_feature_names: bool,
)
| 1969 | return dataset.with_retrieval_job(retrieval_job) |
| 1970 | |
| 1971 | def _materialize_odfv( |
| 1972 | self, |
| 1973 | feature_view: OnDemandFeatureView, |
| 1974 | start_date: datetime, |
| 1975 | end_date: datetime, |
| 1976 | full_feature_names: bool, |
| 1977 | ): |
| 1978 | """Helper to materialize a single OnDemandFeatureView.""" |
| 1979 | if not feature_view.source_feature_view_projections: |
| 1980 | print( |
| 1981 | f"[WARNING] ODFV {feature_view.name} materialization: No source feature views found." |
| 1982 | ) |
| 1983 | return |
| 1984 | start_date = utils.make_tzaware(start_date) |
| 1985 | end_date = utils.make_tzaware(end_date) |
| 1986 | |
| 1987 | source_features_from_projections = [] |
| 1988 | all_join_keys = set() |
| 1989 | entity_timestamp_col_names = set() |
| 1990 | source_fvs = { |
| 1991 | self._get_feature_view(p.name) |
| 1992 | for p in feature_view.source_feature_view_projections.values() |
| 1993 | } |
| 1994 | |
| 1995 | for source_fv in source_fvs: |
| 1996 | all_join_keys.update(source_fv.entities) |
| 1997 | if source_fv.batch_source: |
| 1998 | entity_timestamp_col_names.add(source_fv.batch_source.timestamp_field) |
| 1999 | |
| 2000 | for proj in feature_view.source_feature_view_projections.values(): |
| 2001 | source_features_from_projections.extend( |
| 2002 | [f"{proj.name}:{f.name}" for f in proj.features] |
| 2003 | ) |
| 2004 | |
| 2005 | all_join_keys = {key for key in all_join_keys if key} |
| 2006 | |
| 2007 | if not all_join_keys: |
| 2008 | print( |
| 2009 | f"[WARNING] ODFV {feature_view.name} materialization: No join keys found in source views. Cannot create entity_df. Skipping." |
| 2010 | ) |
| 2011 | return |
| 2012 | |
| 2013 | if len(entity_timestamp_col_names) > 1: |
| 2014 | print( |
| 2015 | f"[WARNING] ODFV {feature_view.name} materialization: Found multiple timestamp columns in sources ({entity_timestamp_col_names}). This is not supported. Skipping." |
| 2016 | ) |
| 2017 | return |
| 2018 | |
| 2019 | if not entity_timestamp_col_names: |
| 2020 | print( |
| 2021 | f"[WARNING] ODFV {feature_view.name} materialization: No batch sources with timestamp columns found for sources. Skipping." |
| 2022 | ) |
| 2023 | return |
| 2024 | |
| 2025 | event_timestamp_col = list(entity_timestamp_col_names)[0] |
| 2026 | all_source_dfs = [] |
| 2027 | provider = self._get_provider() |
| 2028 |
no test coverage detected