Add a new column to each group using an expression. The supplied expression is evaluated against every row in each group, and the resulting column is appended to the group's records. The output dataset preserves the original rows and columns. Examples: >
(
self,
column_name: str,
expr: Expr,
**ray_remote_args,
)
| 310 | |
| 311 | @PublicAPI(api_group=EXPRESSION_API_GROUP, stability="alpha") |
| 312 | def with_column( |
| 313 | self, |
| 314 | column_name: str, |
| 315 | expr: Expr, |
| 316 | **ray_remote_args, |
| 317 | ) -> Dataset: |
| 318 | """Add a new column to each group using an expression. |
| 319 | |
| 320 | The supplied expression is evaluated against every row in each group, and |
| 321 | the resulting column is appended to the group's records. The output dataset |
| 322 | preserves the original rows and columns. |
| 323 | |
| 324 | Examples: |
| 325 | >>> import ray |
| 326 | >>> from ray.data.expressions import col |
| 327 | >>> ds = ( |
| 328 | ... ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}]) |
| 329 | ... .groupby("group") |
| 330 | ... .with_column("value_twice", col("value") * 2) |
| 331 | ... .sort(["group", "value"]) |
| 332 | ... ) |
| 333 | >>> ds.take_all() |
| 334 | [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}] |
| 335 | |
| 336 | Args: |
| 337 | column_name: Name of the column to add. |
| 338 | expr: Expression that yields the values for the new column. |
| 339 | **ray_remote_args: Additional resource requirements to request from Ray |
| 340 | for the underlying map tasks (for example, ``num_gpus=1``). |
| 341 | |
| 342 | Returns: |
| 343 | A new :class:`~ray.data.Dataset` containing all existing columns plus |
| 344 | the newly computed column. |
| 345 | """ |
| 346 | if not isinstance(column_name, str) or not column_name: |
| 347 | raise ValueError( |
| 348 | f"column_name must be a non-empty string, got: {column_name!r}" |
| 349 | ) |
| 350 | if not isinstance(expr, Expr): |
| 351 | raise TypeError( |
| 352 | "expr must be a Ray Data expression created via the expression API." |
| 353 | ) |
| 354 | if isinstance(expr, DownloadExpr): |
| 355 | raise TypeError( |
| 356 | "GroupedData.with_column does not yet support download expressions." |
| 357 | ) |
| 358 | |
| 359 | aliased_expr = expr.alias(column_name) |
| 360 | projection_exprs = [StarExpr(), aliased_expr] |
| 361 | |
| 362 | def _project_group(block: Block) -> Block: |
| 363 | from ray.data._internal.planner.plan_expression.expression_evaluator import ( |
| 364 | eval_projection, |
| 365 | ) |
| 366 | |
| 367 | return eval_projection(projection_exprs, block) |
| 368 | |
| 369 | return self.map_groups( |
nothing calls this directly
no test coverage detected