MCPcopy Index your code
hub / github.com/feast-dev/feast / FeatureServiceLoggingSource

Class FeatureServiceLoggingSource

sdk/python/feast/feature_logging.py:45–105  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

43
44
45class FeatureServiceLoggingSource(LoggingSource):
46 def __init__(self, feature_service: "FeatureService", project: str):
47 self._feature_service = feature_service
48 self._project = project
49
50 def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
51 fields: Dict[str, pa.DataType] = {}
52
53 for projection in self._feature_service.feature_view_projections:
54 # The order of fields in the generated schema should match
55 # the order created on the other side (inside Go logger).
56 # Otherwise, some offline stores might not accept parquet files (produced by Go).
57 # Go code can be found here:
58 # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51
59 try:
60 feast_object = get_feature_view_from_registry(
61 registry,
62 projection.name,
63 self._project,
64 )
65 except FeastObjectNotFoundException:
66 raise FeastObjectNotFoundException(
67 f"Can't recognize feature view with a name {projection.name}"
68 )
69
70 if hasattr(feast_object, "source_request_sources"):
71 for request_source in feast_object.source_request_sources.values():
72 for field in request_source.schema:
73 fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype]
74 else:
75 for entity_column in feast_object.entity_columns:
76 if entity_column.name == DUMMY_ENTITY_ID:
77 continue
78
79 join_key = projection.join_key_map.get(
80 entity_column.name, entity_column.name
81 )
82 fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype]
83
84 for feature in projection.features:
85 fields[f"{projection.name_to_use()}__{feature.name}"] = (
86 FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
87 )
88 fields[f"{projection.name_to_use()}__{feature.name}__timestamp"] = (
89 PA_TIMESTAMP_TYPE
90 )
91 fields[f"{projection.name_to_use()}__{feature.name}__status"] = (
92 pa.int32()
93 )
94
95 # system columns
96 fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=timezone.utc)
97 fields[LOG_DATE_FIELD] = pa.date32()
98 fields[REQUEST_ID_FIELD] = pa.string()
99
100 return pa.schema(
101 [pa.field(name, data_type) for name, data_type in fields.items()]
102 )

Calls

no outgoing calls