| 43 | |
| 44 | |
| 45 | class FeatureServiceLoggingSource(LoggingSource): |
| 46 | def __init__(self, feature_service: "FeatureService", project: str): |
| 47 | self._feature_service = feature_service |
| 48 | self._project = project |
| 49 | |
| 50 | def get_schema(self, registry: "BaseRegistry") -> pa.Schema: |
| 51 | fields: Dict[str, pa.DataType] = {} |
| 52 | |
| 53 | for projection in self._feature_service.feature_view_projections: |
| 54 | # The order of fields in the generated schema should match |
| 55 | # the order created on the other side (inside Go logger). |
| 56 | # Otherwise, some offline stores might not accept parquet files (produced by Go). |
| 57 | # Go code can be found here: |
| 58 | # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51 |
| 59 | try: |
| 60 | feast_object = get_feature_view_from_registry( |
| 61 | registry, |
| 62 | projection.name, |
| 63 | self._project, |
| 64 | ) |
| 65 | except FeastObjectNotFoundException: |
| 66 | raise FeastObjectNotFoundException( |
| 67 | f"Can't recognize feature view with a name {projection.name}" |
| 68 | ) |
| 69 | |
| 70 | if hasattr(feast_object, "source_request_sources"): |
| 71 | for request_source in feast_object.source_request_sources.values(): |
| 72 | for field in request_source.schema: |
| 73 | fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] |
| 74 | else: |
| 75 | for entity_column in feast_object.entity_columns: |
| 76 | if entity_column.name == DUMMY_ENTITY_ID: |
| 77 | continue |
| 78 | |
| 79 | join_key = projection.join_key_map.get( |
| 80 | entity_column.name, entity_column.name |
| 81 | ) |
| 82 | fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] |
| 83 | |
| 84 | for feature in projection.features: |
| 85 | fields[f"{projection.name_to_use()}__{feature.name}"] = ( |
| 86 | FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] |
| 87 | ) |
| 88 | fields[f"{projection.name_to_use()}__{feature.name}__timestamp"] = ( |
| 89 | PA_TIMESTAMP_TYPE |
| 90 | ) |
| 91 | fields[f"{projection.name_to_use()}__{feature.name}__status"] = ( |
| 92 | pa.int32() |
| 93 | ) |
| 94 | |
| 95 | # system columns |
| 96 | fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=timezone.utc) |
| 97 | fields[LOG_DATE_FIELD] = pa.date32() |
| 98 | fields[REQUEST_ID_FIELD] = pa.string() |
| 99 | |
| 100 | return pa.schema( |
| 101 | [pa.field(name, data_type) for name, data_type in fields.items()] |
| 102 | ) |
no outgoing calls