Get list of feature views and corresponding feature names based on feature references
(
features: List[str],
all_feature_views: List["FeatureView"],
all_on_demand_feature_views: List["OnDemandFeatureView"],
)
| 595 | |
| 596 | |
| 597 | def _group_feature_refs( |
| 598 | features: List[str], |
| 599 | all_feature_views: List["FeatureView"], |
| 600 | all_on_demand_feature_views: List["OnDemandFeatureView"], |
| 601 | ) -> Tuple[ |
| 602 | List[Tuple[Union["FeatureView", "OnDemandFeatureView"], List[str]]], |
| 603 | List[Tuple["OnDemandFeatureView", List[str]]], |
| 604 | ]: |
| 605 | """Get list of feature views and corresponding feature names based on feature references""" |
| 606 | |
| 607 | # view name to view proto |
| 608 | view_index: Dict[str, Union["FeatureView", "OnDemandFeatureView"]] = { |
| 609 | view.projection.name_to_use(): view for view in all_feature_views |
| 610 | } |
| 611 | |
| 612 | # on demand view to on demand view proto |
| 613 | on_demand_view_index: Dict[str, "OnDemandFeatureView"] = {} |
| 614 | for view in all_on_demand_feature_views: |
| 615 | if view.projection and not view.write_to_online_store: |
| 616 | on_demand_view_index[view.projection.name_to_use()] = view |
| 617 | elif view.projection and view.write_to_online_store: |
| 618 | # we insert the ODFV view to FVs for ones that are written to the online store |
| 619 | view_index[view.projection.name_to_use()] = view |
| 620 | |
| 621 | # view name to feature names |
| 622 | views_features = defaultdict(set) |
| 623 | |
| 624 | # on demand view name to feature names |
| 625 | on_demand_view_features = defaultdict(set) |
| 626 | |
| 627 | for ref in features: |
| 628 | fv_name, version_num, feat_name = _parse_feature_ref(ref) |
| 629 | # Build the key that matches projection.name_to_use() |
| 630 | if version_num is not None: |
| 631 | view_name = f"{fv_name}@v{version_num}" |
| 632 | else: |
| 633 | view_name = fv_name |
| 634 | if view_name in view_index: |
| 635 | if hasattr(view_index[view_name], "write_to_online_store"): |
| 636 | tmp_feat_name = [ |
| 637 | f for f in view_index[view_name].schema if f.name == feat_name |
| 638 | ] |
| 639 | if len(tmp_feat_name) > 0: |
| 640 | feat_name = tmp_feat_name[0].name |
| 641 | else: |
| 642 | view_index[view_name].projection.get_feature( |
| 643 | feat_name |
| 644 | ) # For validation |
| 645 | views_features[view_name].add(feat_name) |
| 646 | elif view_name in on_demand_view_index: |
| 647 | on_demand_view_index[view_name].projection.get_feature( |
| 648 | feat_name |
| 649 | ) # For validation |
| 650 | on_demand_view_features[view_name].add(feat_name) |
| 651 | # Let's also add in any FV Feature dependencies here. |
| 652 | for input_fv_projection in on_demand_view_index[ |
| 653 | view_name |
| 654 | ].source_feature_view_projections.values(): |
no test coverage detected