MCPcopy Index your code
hub / github.com/feast-dev/feast / _convert_arrow_fv_to_proto

Function _convert_arrow_fv_to_proto

sdk/python/feast/utils.py:426–481  ·  view source on GitHub ↗
(
    table: Union[pyarrow.Table, pyarrow.RecordBatch],
    feature_view: "FeatureView",
    join_keys: Dict[str, ValueType],
)

Source from the content-addressed store, hash-verified

424
425
426def _convert_arrow_fv_to_proto(
427 table: Union[pyarrow.Table, pyarrow.RecordBatch],
428 feature_view: "FeatureView",
429 join_keys: Dict[str, ValueType],
430) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
431 # Avoid ChunkedArrays which guarantees `zero_copy_only` available.
432 if isinstance(table, pyarrow.Table):
433 table = table.to_batches()[0]
434
435 if feature_view.batch_source is None:
436 raise ValueError(
437 f"Feature view '{feature_view.name}' has no batch_source and cannot be converted to proto."
438 )
439
440 # TODO: This will break if the feature view has aggregations or transformations
441 columns = [
442 (field.name, field.dtype.to_value_type()) for field in feature_view.features
443 ] + list(join_keys.items())
444
445 proto_values_by_column = _columns_to_proto_values(
446 table, columns, allow_missing=False
447 )
448
449 entity_keys = _build_entity_keys(table.num_rows, join_keys, proto_values_by_column)
450
451 # Serialize the features per row
452 feature_dict = {
453 feature.name: proto_values_by_column[feature.name]
454 for feature in feature_view.features
455 }
456 features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]
457
458 # Convert event_timestamps
459 event_timestamps = [
460 _coerce_datetime(val)
461 for val in pd.to_datetime(
462 table.column(feature_view.batch_source.timestamp_field).to_numpy(
463 zero_copy_only=False
464 )
465 )
466 ]
467
468 # Convert created_timestamps if they exist
469 if feature_view.batch_source.created_timestamp_column:
470 created_timestamps = [
471 _coerce_datetime(val)
472 for val in pd.to_datetime(
473 table.column(
474 feature_view.batch_source.created_timestamp_column
475 ).to_numpy(zero_copy_only=False)
476 )
477 ]
478 else:
479 created_timestamps = [None] * table.num_rows
480
481 return list(zip(entity_keys, features, event_timestamps, created_timestamps))
482
483

Callers 1

_convert_arrow_to_protoFunction · 0.85

Calls 6

_columns_to_proto_valuesFunction · 0.85
_build_entity_keysFunction · 0.85
_coerce_datetimeFunction · 0.85
columnMethod · 0.80
to_value_typeMethod · 0.45
valuesMethod · 0.45

Tested by

no test coverage detected