Logical operator for all Projection Operations.
| 343 | |
| 344 | @dataclass(frozen=True, repr=False, eq=False) |
| 345 | class Project(AbstractMap, LogicalOperatorSupportsPredicatePassThrough): |
| 346 | """Logical operator for all Projection Operations.""" |
| 347 | |
| 348 | exprs: list["Expr"] |
| 349 | input_dependencies: list[LogicalOperator] = field(repr=False, kw_only=True) |
| 350 | compute: Optional[ComputeStrategy] = None |
| 351 | ray_remote_args: Dict[str, Any] = field(default_factory=dict) |
| 352 | ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None |
| 353 | can_modify_num_rows: bool = field(init=False, default=False) |
| 354 | min_rows_per_bundled_input: Optional[int] = field(init=False, default=None) |
| 355 | batch_size: Optional[int] = field(init=False, default=None) |
| 356 | batch_format: str = field(init=False, default="pyarrow") |
| 357 | zero_copy_batch: bool = field(init=False, default=True) |
| 358 | per_block_limit: Optional[int] = None |
| 359 | _num_outputs: Optional[int] = field(init=False, default=None, repr=False) |
| 360 | |
| 361 | def __post_init__(self): |
| 362 | assert len(self.input_dependencies) == 1, len(self.input_dependencies) |
| 363 | # Eagerly expand ``StarExpr`` when the input schema is known. By the time |
| 364 | # optimizer rules see this op, the projection list contains only |
| 365 | # explicit ``col()`` and computed expressions, no ``StarExpr``. |
| 366 | # When the input schema is opaque (e.g., upstream UDF map), the |
| 367 | # ``StarExpr`` is preserved and runtime ``eval_projection`` |
| 368 | # expands it on a per-block basis. |
| 369 | import pyarrow as pa |
| 370 | |
| 371 | input_schema = self.input_dependencies[0].infer_schema() |
| 372 | if isinstance(input_schema, pa.Schema): |
| 373 | object.__setattr__( |
| 374 | self, "exprs", expand_star_exprs(self.exprs, input_schema) |
| 375 | ) |
| 376 | if self.compute is None: |
| 377 | object.__setattr__( |
| 378 | self, "compute", self._detect_and_get_compute_strategy(self.exprs) |
| 379 | ) |
| 380 | for expr in self.exprs: |
| 381 | if expr.name is None and not isinstance(expr, StarExpr): |
| 382 | raise TypeError( |
| 383 | "All Project expressions must be named (use .alias(name) or col(name)), " |
| 384 | "or be a star() expression." |
| 385 | ) |
| 386 | object.__setattr__(self, "_num_outputs", None) |
| 387 | |
| 388 | def _detect_and_get_compute_strategy(self, exprs: list["Expr"]) -> ComputeStrategy: |
| 389 | """Detect if expressions contain callable class UDFs and return appropriate compute strategy. |
| 390 | |
| 391 | If any expression contains a callable class UDF, returns ActorPoolStrategy. |
| 392 | Otherwise returns TaskPoolStrategy. |
| 393 | """ |
| 394 | from ray.data._internal.planner.plan_expression.expression_visitors import ( |
| 395 | _CallableClassUDFCollector, |
| 396 | ) |
| 397 | |
| 398 | # Check all expressions for callable class UDFs |
| 399 | for expr in exprs: |
| 400 | collector = _CallableClassUDFCollector() |
| 401 | collector.visit(expr) |
| 402 | if collector.get_callable_class_udfs(): |
no outgoing calls
searching dependent graphs…