Return the set of unique composite Entities for a Feature View and the indexes at which they appear. This method allows us to query the OnlineStore for data we need only once rather than requesting and processing data for the same combination of Entities multiple times.
(
table: "FeatureView",
join_key_values: Dict[str, List[ValueProto]],
entity_name_to_join_key_map: Dict[str, str],
)
| 956 | |
| 957 | |
| 958 | def _get_unique_entities( |
| 959 | table: "FeatureView", |
| 960 | join_key_values: Dict[str, List[ValueProto]], |
| 961 | entity_name_to_join_key_map: Dict[str, str], |
| 962 | ) -> Tuple[Tuple[Dict[str, ValueProto], ...], Tuple[List[int], ...], int]: |
| 963 | """Return the set of unique composite Entities for a Feature View and the indexes at which they appear. |
| 964 | |
| 965 | This method allows us to query the OnlineStore for data we need only once |
| 966 | rather than requesting and processing data for the same combination of |
| 967 | Entities multiple times. |
| 968 | """ |
| 969 | # Get the correct set of entity values with the correct join keys. |
| 970 | table_entity_values = _get_table_entity_values( |
| 971 | table, |
| 972 | entity_name_to_join_key_map, |
| 973 | join_key_values, |
| 974 | ) |
| 975 | # Validate that all expected join keys exist and have non-empty values. |
| 976 | expected_keys = set(entity_name_to_join_key_map.values()) |
| 977 | expected_keys.discard("__dummy_id") |
| 978 | missing_keys = sorted( |
| 979 | list(set([key for key in expected_keys if key not in table_entity_values])) |
| 980 | ) |
| 981 | empty_keys = sorted( |
| 982 | list(set([key for key in expected_keys if not table_entity_values.get(key)])) |
| 983 | ) |
| 984 | |
| 985 | if missing_keys or empty_keys: |
| 986 | if not any(table_entity_values.values()): |
| 987 | raise KeyError( |
| 988 | f"Missing join key values for keys: {missing_keys}. " |
| 989 | f"No values provided for keys: {empty_keys}. " |
| 990 | f"Provided join_key_values: {list(join_key_values.keys())}" |
| 991 | ) |
| 992 | |
| 993 | # Convert the column-oriented table_entity_values into row-wise data. |
| 994 | keys = list(table_entity_values.keys()) |
| 995 | # Each row is a tuple of ValueProto objects corresponding to the join keys. |
| 996 | rowise = list(enumerate(zip(*table_entity_values.values()))) |
| 997 | |
| 998 | # If there are no rows, return empty tuples. |
| 999 | if not rowise: |
| 1000 | return (), (), 0 |
| 1001 | |
| 1002 | # Sort rowise so that rows with the same join key values are adjacent. |
| 1003 | rowise.sort(key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1])) |
| 1004 | |
| 1005 | # Group rows by their composite join key value. |
| 1006 | groups = [ |
| 1007 | (dict(zip(keys, key_tuple)), [idx for idx, _ in group]) |
| 1008 | for key_tuple, group in itertools.groupby(rowise, key=lambda row: row[1]) |
| 1009 | ] |
| 1010 | |
| 1011 | # If no groups were formed (should not happen for valid input), return empty tuples. |
| 1012 | if not groups: |
| 1013 | return (), (), 0 |
| 1014 | |
| 1015 | # Unpack the unique entities and their original row indexes. |
nothing calls this directly
no test coverage detected