MCPcopy Index your code
hub / github.com/ray-project/ray / map_groups

Method map_groups

python/ray/data/grouped_data.py:95–309  ·  view source on GitHub ↗

Apply the given function to each group of records of this dataset. While map_groups() is very flexible, note that it comes with downsides: * It may be slower than using more specific methods such as min(), max(). * It requires that each group fits in memory on a single node

(
        self,
        fn: UserDefinedFunction[DataBatch, DataBatch],
        *,
        zero_copy_batch: bool = True,
        compute: Union[str, ComputeStrategy] = None,
        batch_format: Optional[str] = "default",
        fn_args: Optional[Iterable[Any]] = None,
        fn_kwargs: Optional[Dict[str, Any]] = None,
        fn_constructor_args: Optional[Iterable[Any]] = None,
        fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
        num_cpus: Optional[float] = None,
        num_gpus: Optional[float] = None,
        memory: Optional[float] = None,
        concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
        ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
        **ray_remote_args,
    )

Source from the content-addressed store, hash-verified

93
94 @PublicAPI(api_group=FA_API_GROUP)
95 def map_groups(
96 self,
97 fn: UserDefinedFunction[DataBatch, DataBatch],
98 *,
99 zero_copy_batch: bool = True,
100 compute: Union[str, ComputeStrategy] = None,
101 batch_format: Optional[str] = "default",
102 fn_args: Optional[Iterable[Any]] = None,
103 fn_kwargs: Optional[Dict[str, Any]] = None,
104 fn_constructor_args: Optional[Iterable[Any]] = None,
105 fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
106 num_cpus: Optional[float] = None,
107 num_gpus: Optional[float] = None,
108 memory: Optional[float] = None,
109 concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
110 ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
111 **ray_remote_args,
112 ) -> "Dataset":
113 """Apply the given function to each group of records of this dataset.
114
115 While map_groups() is very flexible, note that it comes with downsides:
116
117 * It may be slower than using more specific methods such as min(), max().
118 * It requires that each group fits in memory on a single node.
119
120 In general, prefer to use `aggregate()` instead of `map_groups()`.
121
122 .. warning::
123 Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
124 and may result in scheduling or stability issues. Please
125 `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
126 to the Ray team.
127
128 Examples:
129 >>> # Return a single record per group (list of multiple records in,
130 >>> # list of a single record out).
131 >>> import ray
132 >>> import pandas as pd
133 >>> import numpy as np
134 >>> # Get first value per group.
135 >>> ds = ray.data.from_items([ # doctest: +SKIP
136 ... {"group": 1, "value": 1},
137 ... {"group": 1, "value": 2},
138 ... {"group": 2, "value": 3},
139 ... {"group": 2, "value": 4}])
140 >>> ds.groupby("group").map_groups( # doctest: +SKIP
141 ... lambda g: {"result": np.array([g["value"][0]])})
142
143 >>> # Return multiple records per group (dataframe in, dataframe out).
144 >>> df = pd.DataFrame(
145 ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
146 ... )
147 >>> ds = ray.data.from_pandas(df) # doctest: +SKIP
148 >>> grouped = ds.groupby("A") # doctest: +SKIP
149 >>> grouped.map_groups( # doctest: +SKIP
150 ... lambda g: g.apply(
151 ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
152 ... )

Calls 3

repartitionMethod · 0.45
sortMethod · 0.45