MCPcopy Index your code
hub / github.com/feast-dev/feast / _get_unique_entities

Function _get_unique_entities

sdk/python/feast/utils.py:958–1017  ·  view source on GitHub ↗

Return the set of unique composite Entities for a Feature View and the indexes at which they appear. This method allows us to query the OnlineStore for data we need only once rather than requesting and processing data for the same combination of Entities multiple times.

(
    table: "FeatureView",
    join_key_values: Dict[str, List[ValueProto]],
    entity_name_to_join_key_map: Dict[str, str],
)

Source from the content-addressed store, hash-verified

956
957
958def _get_unique_entities(
959 table: "FeatureView",
960 join_key_values: Dict[str, List[ValueProto]],
961 entity_name_to_join_key_map: Dict[str, str],
962) -> Tuple[Tuple[Dict[str, ValueProto], ...], Tuple[List[int], ...], int]:
963 """Return the set of unique composite Entities for a Feature View and the indexes at which they appear.
964
965 This method allows us to query the OnlineStore for data we need only once
966 rather than requesting and processing data for the same combination of
967 Entities multiple times.
968 """
969 # Get the correct set of entity values with the correct join keys.
970 table_entity_values = _get_table_entity_values(
971 table,
972 entity_name_to_join_key_map,
973 join_key_values,
974 )
975 # Validate that all expected join keys exist and have non-empty values.
976 expected_keys = set(entity_name_to_join_key_map.values())
977 expected_keys.discard("__dummy_id")
978 missing_keys = sorted(
979 list(set([key for key in expected_keys if key not in table_entity_values]))
980 )
981 empty_keys = sorted(
982 list(set([key for key in expected_keys if not table_entity_values.get(key)]))
983 )
984
985 if missing_keys or empty_keys:
986 if not any(table_entity_values.values()):
987 raise KeyError(
988 f"Missing join key values for keys: {missing_keys}. "
989 f"No values provided for keys: {empty_keys}. "
990 f"Provided join_key_values: {list(join_key_values.keys())}"
991 )
992
993 # Convert the column-oriented table_entity_values into row-wise data.
994 keys = list(table_entity_values.keys())
995 # Each row is a tuple of ValueProto objects corresponding to the join keys.
996 rowise = list(enumerate(zip(*table_entity_values.values())))
997
998 # If there are no rows, return empty tuples.
999 if not rowise:
1000 return (), (), 0
1001
1002 # Sort rowise so that rows with the same join key values are adjacent.
1003 rowise.sort(key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1]))
1004
1005 # Group rows by their composite join key value.
1006 groups = [
1007 (dict(zip(keys, key_tuple)), [idx for idx, _ in group])
1008 for key_tuple, group in itertools.groupby(rowise, key=lambda row: row[1])
1009 ]
1010
1011 # If no groups were formed (should not happen for valid input), return empty tuples.
1012 if not groups:
1013 return (), (), 0
1014
1015 # Unpack the unique entities and their original row indexes.

Callers

nothing calls this directly

Calls 5

_get_table_entity_valuesFunction · 0.85
getMethod · 0.80
keysMethod · 0.80
sortMethod · 0.80
valuesMethod · 0.45

Tested by

no test coverage detected