MCPcopy Index your code
hub / github.com/feast-dev/feast / _apply_aggregations_to_response

Function _apply_aggregations_to_response

sdk/python/feast/utils.py:670–723  ·  view source on GitHub ↗

Apply aggregations using PandasBackend. Args: response_data: Either a pyarrow.Table or dict of lists containing the data aggregations: List of Aggregation objects to apply group_keys: List of column names to group by (optional) mode: Transformation mode ("py

(
    response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
    aggregations,
    group_keys: Optional[List[str]],
    mode: str,
)

Source from the content-addressed store, hash-verified

668
669
670def _apply_aggregations_to_response(
671 response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
672 aggregations,
673 group_keys: Optional[List[str]],
674 mode: str,
675) -> Union[pyarrow.Table, Dict[str, List[Any]]]:
676 """
677 Apply aggregations using PandasBackend.
678
679 Args:
680 response_data: Either a pyarrow.Table or dict of lists containing the data
681 aggregations: List of Aggregation objects to apply
682 group_keys: List of column names to group by (optional)
683 mode: Transformation mode ("python", "pandas", or "substrait")
684
685 Returns:
686 Aggregated data in the same format as input
687
688 TODO: Consider refactoring to support backends other than pandas in the future.
689 """
690 if not aggregations:
691 return response_data
692
693 backend = PandasBackend()
694
695 # Convert to pandas DataFrame
696 if isinstance(response_data, dict):
697 df = pd.DataFrame(response_data)
698 else: # pyarrow.Table
699 df = backend.from_arrow(response_data)
700
701 if df.empty:
702 return response_data
703
704 # Convert aggregations to agg_ops format
705 agg_ops = aggregation_specs_to_agg_ops(
706 aggregations,
707 time_window_unsupported_error_message=(
708 "Time window aggregation is not supported in online serving."
709 ),
710 )
711
712 # Apply aggregations using PandasBackend
713 if group_keys:
714 result_df = backend.groupby_agg(df, group_keys, agg_ops)
715 else:
716 # No grouping - aggregate over entire dataset
717 result_df = backend.groupby_agg(df, [], agg_ops)
718
719 # Convert back to original format
720 if mode == "python":
721 return {col: result_df[col].tolist() for col in result_df.columns}
722 else: # pandas or substrait
723 return backend.to_arrow(result_df)
724
725
726def _augment_response_with_on_demand_transforms(

Calls 5

from_arrowMethod · 0.95
groupby_aggMethod · 0.95
to_arrowMethod · 0.95
PandasBackendClass · 0.90