Persists a dataframe to the online store. Args: feature_view_name: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. inputs: Optional the dictionary object to be written allow_registry_cache (opti
(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
transform_on_write: bool = True,
)
| 3080 | return feature_view, df |
| 3081 | |
| 3082 | def write_to_online_store( |
| 3083 | self, |
| 3084 | feature_view_name: str, |
| 3085 | df: Optional[pd.DataFrame] = None, |
| 3086 | inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, |
| 3087 | allow_registry_cache: bool = True, |
| 3088 | transform_on_write: bool = True, |
| 3089 | ): |
| 3090 | """ |
| 3091 | Persists a dataframe to the online store. |
| 3092 | |
| 3093 | Args: |
| 3094 | feature_view_name: The feature view to which the dataframe corresponds. |
| 3095 | df: The dataframe to be persisted. |
| 3096 | inputs: Optional the dictionary object to be written |
| 3097 | allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. |
| 3098 | transform_on_write (optional): Whether to transform the data before pushing. |
| 3099 | """ |
| 3100 | |
| 3101 | feature_view, df = self._get_feature_view_and_df_for_online_write( |
| 3102 | feature_view_name=feature_view_name, |
| 3103 | df=df, |
| 3104 | inputs=inputs, |
| 3105 | allow_registry_cache=allow_registry_cache, |
| 3106 | transform_on_write=transform_on_write, |
| 3107 | ) |
| 3108 | |
| 3109 | # Validate that the dataframe has meaningful feature data |
| 3110 | if df is not None: |
| 3111 | if df.empty: |
| 3112 | warnings.warn("Cannot write empty dataframe to online store") |
| 3113 | return # Early return for empty dataframe |
| 3114 | |
| 3115 | # Check if feature columns are empty (entity columns may have data but feature columns are empty) |
| 3116 | feature_column_names = [f.name for f in feature_view.features] |
| 3117 | if feature_column_names: |
| 3118 | feature_df = df[feature_column_names] |
| 3119 | if feature_df.empty or feature_df.isnull().all().all(): |
| 3120 | warnings.warn( |
| 3121 | "Cannot write dataframe with empty feature columns to online store" |
| 3122 | ) |
| 3123 | return # Early return for empty feature columns |
| 3124 | |
| 3125 | provider = self._get_provider() |
| 3126 | provider.ingest_df(feature_view, df) |
| 3127 | |
| 3128 | async def write_to_online_store_async( |
| 3129 | self, |