(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
join_keys: Dict[str, ValueType],
)
| 424 | |
| 425 | |
| 426 | def _convert_arrow_fv_to_proto( |
| 427 | table: Union[pyarrow.Table, pyarrow.RecordBatch], |
| 428 | feature_view: "FeatureView", |
| 429 | join_keys: Dict[str, ValueType], |
| 430 | ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: |
| 431 | # Avoid ChunkedArrays which guarantees `zero_copy_only` available. |
| 432 | if isinstance(table, pyarrow.Table): |
| 433 | table = table.to_batches()[0] |
| 434 | |
| 435 | if feature_view.batch_source is None: |
| 436 | raise ValueError( |
| 437 | f"Feature view '{feature_view.name}' has no batch_source and cannot be converted to proto." |
| 438 | ) |
| 439 | |
| 440 | # TODO: This will break if the feature view has aggregations or transformations |
| 441 | columns = [ |
| 442 | (field.name, field.dtype.to_value_type()) for field in feature_view.features |
| 443 | ] + list(join_keys.items()) |
| 444 | |
| 445 | proto_values_by_column = _columns_to_proto_values( |
| 446 | table, columns, allow_missing=False |
| 447 | ) |
| 448 | |
| 449 | entity_keys = _build_entity_keys(table.num_rows, join_keys, proto_values_by_column) |
| 450 | |
| 451 | # Serialize the features per row |
| 452 | feature_dict = { |
| 453 | feature.name: proto_values_by_column[feature.name] |
| 454 | for feature in feature_view.features |
| 455 | } |
| 456 | features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] |
| 457 | |
| 458 | # Convert event_timestamps |
| 459 | event_timestamps = [ |
| 460 | _coerce_datetime(val) |
| 461 | for val in pd.to_datetime( |
| 462 | table.column(feature_view.batch_source.timestamp_field).to_numpy( |
| 463 | zero_copy_only=False |
| 464 | ) |
| 465 | ) |
| 466 | ] |
| 467 | |
| 468 | # Convert created_timestamps if they exist |
| 469 | if feature_view.batch_source.created_timestamp_column: |
| 470 | created_timestamps = [ |
| 471 | _coerce_datetime(val) |
| 472 | for val in pd.to_datetime( |
| 473 | table.column( |
| 474 | feature_view.batch_source.created_timestamp_column |
| 475 | ).to_numpy(zero_copy_only=False) |
| 476 | ) |
| 477 | ] |
| 478 | else: |
| 479 | created_timestamps = [None] * table.num_rows |
| 480 | |
| 481 | return list(zip(entity_keys, features, event_timestamps, created_timestamps)) |
| 482 | |
| 483 |
no test coverage detected