Apply the given function to each group of records of this dataset. While map_groups() is very flexible, note that it comes with downsides: * It may be slower than using more specific methods such as min(), max(). * It requires that each group fits in memory on a single node
(
self,
fn: UserDefinedFunction[DataBatch, DataBatch],
*,
zero_copy_batch: bool = True,
compute: Union[str, ComputeStrategy] = None,
batch_format: Optional[str] = "default",
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
**ray_remote_args,
)
| 93 | |
| 94 | @PublicAPI(api_group=FA_API_GROUP) |
| 95 | def map_groups( |
| 96 | self, |
| 97 | fn: UserDefinedFunction[DataBatch, DataBatch], |
| 98 | *, |
| 99 | zero_copy_batch: bool = True, |
| 100 | compute: Union[str, ComputeStrategy] = None, |
| 101 | batch_format: Optional[str] = "default", |
| 102 | fn_args: Optional[Iterable[Any]] = None, |
| 103 | fn_kwargs: Optional[Dict[str, Any]] = None, |
| 104 | fn_constructor_args: Optional[Iterable[Any]] = None, |
| 105 | fn_constructor_kwargs: Optional[Dict[str, Any]] = None, |
| 106 | num_cpus: Optional[float] = None, |
| 107 | num_gpus: Optional[float] = None, |
| 108 | memory: Optional[float] = None, |
| 109 | concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None, |
| 110 | ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, |
| 111 | **ray_remote_args, |
| 112 | ) -> "Dataset": |
| 113 | """Apply the given function to each group of records of this dataset. |
| 114 | |
| 115 | While map_groups() is very flexible, note that it comes with downsides: |
| 116 | |
| 117 | * It may be slower than using more specific methods such as min(), max(). |
| 118 | * It requires that each group fits in memory on a single node. |
| 119 | |
| 120 | In general, prefer to use `aggregate()` instead of `map_groups()`. |
| 121 | |
| 122 | .. warning:: |
| 123 | Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental, |
| 124 | and may result in scheduling or stability issues. Please |
| 125 | `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_ |
| 126 | to the Ray team. |
| 127 | |
| 128 | Examples: |
| 129 | >>> # Return a single record per group (list of multiple records in, |
| 130 | >>> # list of a single record out). |
| 131 | >>> import ray |
| 132 | >>> import pandas as pd |
| 133 | >>> import numpy as np |
| 134 | >>> # Get first value per group. |
| 135 | >>> ds = ray.data.from_items([ # doctest: +SKIP |
| 136 | ... {"group": 1, "value": 1}, |
| 137 | ... {"group": 1, "value": 2}, |
| 138 | ... {"group": 2, "value": 3}, |
| 139 | ... {"group": 2, "value": 4}]) |
| 140 | >>> ds.groupby("group").map_groups( # doctest: +SKIP |
| 141 | ... lambda g: {"result": np.array([g["value"][0]])}) |
| 142 | |
| 143 | >>> # Return multiple records per group (dataframe in, dataframe out). |
| 144 | >>> df = pd.DataFrame( |
| 145 | ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} |
| 146 | ... ) |
| 147 | >>> ds = ray.data.from_pandas(df) # doctest: +SKIP |
| 148 | >>> grouped = ds.groupby("A") # doctest: +SKIP |
| 149 | >>> grouped.map_groups( # doctest: +SKIP |
| 150 | ... lambda g: g.apply( |
| 151 | ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c |
| 152 | ... ) |