| 2834 | self._precompute_for_push(pushed_fv_names[0], df) |
| 2835 | |
| 2836 | async def push_async( |
| 2837 | self, |
| 2838 | push_source_name: str, |
| 2839 | df: pd.DataFrame, |
| 2840 | allow_registry_cache: bool = True, |
| 2841 | to: PushMode = PushMode.ONLINE, |
| 2842 | **kwargs, |
| 2843 | ): |
| 2844 | fvs = self._fvs_for_push_source_or_raise(push_source_name, allow_registry_cache) |
| 2845 | |
| 2846 | if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: |
| 2847 | _ = await asyncio.gather( |
| 2848 | *[ |
| 2849 | self.write_to_online_store_async( |
| 2850 | fv.name, df, allow_registry_cache=allow_registry_cache |
| 2851 | ) |
| 2852 | for fv in fvs |
| 2853 | ] |
| 2854 | ) |
| 2855 | |
| 2856 | if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: |
| 2857 | |
| 2858 | def _offline_write(): |
| 2859 | for fv in fvs: |
| 2860 | self.write_to_offline_store( |
| 2861 | fv.name, df, allow_registry_cache=allow_registry_cache |
| 2862 | ) |
| 2863 | |
| 2864 | await run_in_threadpool(_offline_write) |
| 2865 | |
| 2866 | def _validate_and_convert_input_data( |
| 2867 | self, |