Push features to a push source. This updates all the feature views that have the push source as stream source. Args: push_source_name: The name of the push source we want to push data to. df: The data being pushed. allow_registry_cache: Whether t
(
self,
push_source_name: str,
df: pd.DataFrame,
allow_registry_cache: bool = True,
to: PushMode = PushMode.ONLINE,
transform_on_write: bool = True,
)
| 2796 | ) |
| 2797 | |
| 2798 | def push( |
| 2799 | self, |
| 2800 | push_source_name: str, |
| 2801 | df: pd.DataFrame, |
| 2802 | allow_registry_cache: bool = True, |
| 2803 | to: PushMode = PushMode.ONLINE, |
| 2804 | transform_on_write: bool = True, |
| 2805 | ): |
| 2806 | """ |
| 2807 | Push features to a push source. This updates all the feature views that have the push source as stream source. |
| 2808 | |
| 2809 | Args: |
| 2810 | push_source_name: The name of the push source we want to push data to. |
| 2811 | df: The data being pushed. |
| 2812 | allow_registry_cache: Whether to allow cached versions of the registry. |
| 2813 | to: Whether to push to online or offline store. Defaults to online store only. |
| 2814 | transform_on_write: Whether to transform the data before pushing. |
| 2815 | """ |
| 2816 | pushed_fv_names = [] |
| 2817 | for fv in self._fvs_for_push_source_or_raise( |
| 2818 | push_source_name, allow_registry_cache |
| 2819 | ): |
| 2820 | if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: |
| 2821 | self.write_to_online_store( |
| 2822 | fv.name, |
| 2823 | df, |
| 2824 | allow_registry_cache=allow_registry_cache, |
| 2825 | transform_on_write=transform_on_write, |
| 2826 | ) |
| 2827 | pushed_fv_names.append(fv.name) |
| 2828 | if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: |
| 2829 | self.write_to_offline_store( |
| 2830 | fv.name, df, allow_registry_cache=allow_registry_cache |
| 2831 | ) |
| 2832 | |
| 2833 | if pushed_fv_names: |
| 2834 | self._precompute_for_push(pushed_fv_names[0], df) |
| 2835 | |
| 2836 | async def push_async( |
| 2837 | self, |