Persists a dataframe to the online store asynchronously. Args: feature_view_name: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. inputs: Optional the dictionary object to be written allow_regis
(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
)
| 3126 | provider.ingest_df(feature_view, df) |
| 3127 | |
| 3128 | async def write_to_online_store_async( |
| 3129 | self, |
| 3130 | feature_view_name: str, |
| 3131 | df: Optional[pd.DataFrame] = None, |
| 3132 | inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, |
| 3133 | allow_registry_cache: bool = True, |
| 3134 | ): |
| 3135 | """ |
| 3136 | Persists a dataframe to the online store asynchronously. |
| 3137 | |
| 3138 | Args: |
| 3139 | feature_view_name: The feature view to which the dataframe corresponds. |
| 3140 | df: The dataframe to be persisted. |
| 3141 | inputs: Optional the dictionary object to be written |
| 3142 | allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. |
| 3143 | """ |
| 3144 | |
| 3145 | feature_view, df = self._get_feature_view_and_df_for_online_write( |
| 3146 | feature_view_name=feature_view_name, |
| 3147 | df=df, |
| 3148 | inputs=inputs, |
| 3149 | allow_registry_cache=allow_registry_cache, |
| 3150 | ) |
| 3151 | |
| 3152 | # Validate that the dataframe has meaningful feature data |
| 3153 | if df is not None: |
| 3154 | if df.empty: |
| 3155 | warnings.warn("Cannot write empty dataframe to online store") |
| 3156 | return # Early return for empty dataframe |
| 3157 | |
| 3158 | # Check if feature columns are empty (entity columns may have data but feature columns are empty) |
| 3159 | feature_column_names = [f.name for f in feature_view.features] |
| 3160 | if feature_column_names: |
| 3161 | feature_df = df[feature_column_names] |
| 3162 | if feature_df.empty or feature_df.isnull().all().all(): |
| 3163 | warnings.warn( |
| 3164 | "Cannot write dataframe with empty feature columns to online store" |
| 3165 | ) |
| 3166 | return # Early return for empty feature columns |
| 3167 | |
| 3168 | provider = self._get_provider() |
| 3169 | await provider.ingest_df_async(feature_view, df) |
| 3170 | |
| 3171 | async def update_online_store( |
| 3172 | self, |