Computes on demand feature values and adds them to the result rows. Assumes that 'online_features_response' already contains the necessary request data and input feature views for the on demand feature views. Unneeded feature values such as request data and unrequested input feature vie
(
online_features_response: GetOnlineFeaturesResponse,
feature_refs: List[str],
requested_on_demand_feature_views: List["OnDemandFeatureView"],
full_feature_names: bool,
feature_types: Optional[Dict[str, "ValueType"]] = None,
)
| 724 | |
| 725 | |
| 726 | def _augment_response_with_on_demand_transforms( |
| 727 | online_features_response: GetOnlineFeaturesResponse, |
| 728 | feature_refs: List[str], |
| 729 | requested_on_demand_feature_views: List["OnDemandFeatureView"], |
| 730 | full_feature_names: bool, |
| 731 | feature_types: Optional[Dict[str, "ValueType"]] = None, |
| 732 | ): |
| 733 | """Computes on demand feature values and adds them to the result rows. |
| 734 | |
| 735 | Assumes that 'online_features_response' already contains the necessary request data and input feature |
| 736 | views for the on demand feature views. Unneeded feature values such as request data and |
| 737 | unrequested input feature views will be removed from 'online_features_response'. |
| 738 | |
| 739 | Args: |
| 740 | online_features_response: Protobuf object to populate |
| 741 | feature_refs: List of all feature references to be returned. |
| 742 | requested_on_demand_feature_views: List of all odfvs that have been requested. |
| 743 | full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, |
| 744 | changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to |
| 745 | "customer_fv__daily_transactions"). |
| 746 | """ |
| 747 | from feast.online_response import OnlineResponse |
| 748 | |
| 749 | requested_odfv_map = {odfv.name: odfv for odfv in requested_on_demand_feature_views} |
| 750 | requested_odfv_feature_names = requested_odfv_map.keys() |
| 751 | |
| 752 | odfv_feature_refs = defaultdict(list) |
| 753 | for feature_ref in feature_refs: |
| 754 | view_name, _, feature_name = _parse_feature_ref(feature_ref) |
| 755 | if view_name in requested_odfv_feature_names: |
| 756 | odfv_feature_refs[view_name].append( |
| 757 | f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}" |
| 758 | if full_feature_names |
| 759 | else feature_name |
| 760 | ) |
| 761 | |
| 762 | initial_response = OnlineResponse( |
| 763 | online_features_response, feature_types=feature_types |
| 764 | ) |
| 765 | initial_response_arrow: Optional[pyarrow.Table] = None |
| 766 | initial_response_dict: Optional[Dict[str, List[Any]]] = None |
| 767 | |
| 768 | def _is_metrics_active(): |
| 769 | try: |
| 770 | from feast.metrics import _config |
| 771 | |
| 772 | return _config.online_features |
| 773 | except Exception: |
| 774 | return False |
| 775 | |
| 776 | _metrics_active = _is_metrics_active() |
| 777 | |
| 778 | # Apply on demand transformations and augment the result rows |
| 779 | odfv_result_names = set() |
| 780 | for odfv_name, _feature_refs in odfv_feature_refs.items(): |
| 781 | odfv = requested_odfv_map[odfv_name] |
| 782 | if not odfv.write_to_online_store: |
| 783 | _should_track = _metrics_active and getattr(odfv, "track_metrics", False) |
nothing calls this directly
no test coverage detected