MCPcopy Index your code
hub / github.com/feast-dev/feast / get_schema

Method get_schema

sdk/python/feast/feature_logging.py:50–102  ·  view source on GitHub ↗
(self, registry: "BaseRegistry")

Source from the content-addressed store, hash-verified

48 self._project = project
49
50 def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
51 fields: Dict[str, pa.DataType] = {}
52
53 for projection in self._feature_service.feature_view_projections:
54 # The order of fields in the generated schema should match
55 # the order created on the other side (inside Go logger).
56 # Otherwise, some offline stores might not accept parquet files (produced by Go).
57 # Go code can be found here:
58 # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51
59 try:
60 feast_object = get_feature_view_from_registry(
61 registry,
62 projection.name,
63 self._project,
64 )
65 except FeastObjectNotFoundException:
66 raise FeastObjectNotFoundException(
67 f"Can't recognize feature view with a name {projection.name}"
68 )
69
70 if hasattr(feast_object, "source_request_sources"):
71 for request_source in feast_object.source_request_sources.values():
72 for field in request_source.schema:
73 fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype]
74 else:
75 for entity_column in feast_object.entity_columns:
76 if entity_column.name == DUMMY_ENTITY_ID:
77 continue
78
79 join_key = projection.join_key_map.get(
80 entity_column.name, entity_column.name
81 )
82 fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype]
83
84 for feature in projection.features:
85 fields[f"{projection.name_to_use()}__{feature.name}"] = (
86 FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
87 )
88 fields[f"{projection.name_to_use()}__{feature.name}__timestamp"] = (
89 PA_TIMESTAMP_TYPE
90 )
91 fields[f"{projection.name_to_use()}__{feature.name}__status"] = (
92 pa.int32()
93 )
94
95 # system columns
96 fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=timezone.utc)
97 fields[LOG_DATE_FIELD] = pa.date32()
98 fields[REQUEST_ID_FIELD] = pa.string()
99
100 return pa.schema(
101 [pa.field(name, data_type) for name, data_type in fields.items()]
102 )
103
104 def get_log_timestamp_column(self) -> str:
105 return LOG_TIMESTAMP_FIELD

Callers 2

_resolve_log_sourceMethod · 0.95

Calls 6

getMethod · 0.80
name_to_useMethod · 0.80
valuesMethod · 0.45
schemaMethod · 0.45

Tested by

no test coverage detected