Apply aggregations using PandasBackend. Args: response_data: Either a pyarrow.Table or dict of lists containing the data aggregations: List of Aggregation objects to apply group_keys: List of column names to group by (optional) mode: Transformation mode ("py
(
response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
aggregations,
group_keys: Optional[List[str]],
mode: str,
)
| 668 | |
| 669 | |
| 670 | def _apply_aggregations_to_response( |
| 671 | response_data: Union[pyarrow.Table, Dict[str, List[Any]]], |
| 672 | aggregations, |
| 673 | group_keys: Optional[List[str]], |
| 674 | mode: str, |
| 675 | ) -> Union[pyarrow.Table, Dict[str, List[Any]]]: |
| 676 | """ |
| 677 | Apply aggregations using PandasBackend. |
| 678 | |
| 679 | Args: |
| 680 | response_data: Either a pyarrow.Table or dict of lists containing the data |
| 681 | aggregations: List of Aggregation objects to apply |
| 682 | group_keys: List of column names to group by (optional) |
| 683 | mode: Transformation mode ("python", "pandas", or "substrait") |
| 684 | |
| 685 | Returns: |
| 686 | Aggregated data in the same format as input |
| 687 | |
| 688 | TODO: Consider refactoring to support backends other than pandas in the future. |
| 689 | """ |
| 690 | if not aggregations: |
| 691 | return response_data |
| 692 | |
| 693 | backend = PandasBackend() |
| 694 | |
| 695 | # Convert to pandas DataFrame |
| 696 | if isinstance(response_data, dict): |
| 697 | df = pd.DataFrame(response_data) |
| 698 | else: # pyarrow.Table |
| 699 | df = backend.from_arrow(response_data) |
| 700 | |
| 701 | if df.empty: |
| 702 | return response_data |
| 703 | |
| 704 | # Convert aggregations to agg_ops format |
| 705 | agg_ops = aggregation_specs_to_agg_ops( |
| 706 | aggregations, |
| 707 | time_window_unsupported_error_message=( |
| 708 | "Time window aggregation is not supported in online serving." |
| 709 | ), |
| 710 | ) |
| 711 | |
| 712 | # Apply aggregations using PandasBackend |
| 713 | if group_keys: |
| 714 | result_df = backend.groupby_agg(df, group_keys, agg_ops) |
| 715 | else: |
| 716 | # No grouping - aggregate over entire dataset |
| 717 | result_df = backend.groupby_agg(df, [], agg_ops) |
| 718 | |
| 719 | # Convert back to original format |
| 720 | if mode == "python": |
| 721 | return {col: result_df[col].tolist() for col in result_df.columns} |
| 722 | else: # pandas or substrait |
| 723 | return backend.to_arrow(result_df) |
| 724 | |
| 725 | |
| 726 | def _augment_response_with_on_demand_transforms( |