MCPcopy
hub / github.com/ray-project/ray / Project

Class Project

python/ray/data/_internal/logical/operators/map_operator.py:345–453  ·  view source on GitHub ↗

Logical operator for all Projection Operations.

Source from the content-addressed store, hash-verified

343
344@dataclass(frozen=True, repr=False, eq=False)
345class Project(AbstractMap, LogicalOperatorSupportsPredicatePassThrough):
346 """Logical operator for all Projection Operations."""
347
348 exprs: list["Expr"]
349 input_dependencies: list[LogicalOperator] = field(repr=False, kw_only=True)
350 compute: Optional[ComputeStrategy] = None
351 ray_remote_args: Dict[str, Any] = field(default_factory=dict)
352 ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None
353 can_modify_num_rows: bool = field(init=False, default=False)
354 min_rows_per_bundled_input: Optional[int] = field(init=False, default=None)
355 batch_size: Optional[int] = field(init=False, default=None)
356 batch_format: str = field(init=False, default="pyarrow")
357 zero_copy_batch: bool = field(init=False, default=True)
358 per_block_limit: Optional[int] = None
359 _num_outputs: Optional[int] = field(init=False, default=None, repr=False)
360
361 def __post_init__(self):
362 assert len(self.input_dependencies) == 1, len(self.input_dependencies)
363 # Eagerly expand ``StarExpr`` when the input schema is known. By the time
364 # optimizer rules see this op, the projection list contains only
365 # explicit ``col()`` and computed expressions, no ``StarExpr``.
366 # When the input schema is opaque (e.g., upstream UDF map), the
367 # ``StarExpr`` is preserved and runtime ``eval_projection``
368 # expands it on a per-block basis.
369 import pyarrow as pa
370
371 input_schema = self.input_dependencies[0].infer_schema()
372 if isinstance(input_schema, pa.Schema):
373 object.__setattr__(
374 self, "exprs", expand_star_exprs(self.exprs, input_schema)
375 )
376 if self.compute is None:
377 object.__setattr__(
378 self, "compute", self._detect_and_get_compute_strategy(self.exprs)
379 )
380 for expr in self.exprs:
381 if expr.name is None and not isinstance(expr, StarExpr):
382 raise TypeError(
383 "All Project expressions must be named (use .alias(name) or col(name)), "
384 "or be a star() expression."
385 )
386 object.__setattr__(self, "_num_outputs", None)
387
388 def _detect_and_get_compute_strategy(self, exprs: list["Expr"]) -> ComputeStrategy:
389 """Detect if expressions contain callable class UDFs and return appropriate compute strategy.
390
391 If any expression contains a callable class UDF, returns ActorPoolStrategy.
392 Otherwise returns TaskPoolStrategy.
393 """
394 from ray.data._internal.planner.plan_expression.expression_visitors import (
395 _CallableClassUDFCollector,
396 )
397
398 # Check all expressions for callable class UDFs
399 for expr in exprs:
400 collector = _CallableClassUDFCollector()
401 collector.visit(expr)
402 if collector.get_callable_class_udfs():

Callers 11

with_columnMethod · 0.90
select_columnsMethod · 0.90
rename_columnsMethod · 0.90
countMethod · 0.90
_create_project_chainMethod · 0.90
_try_fuseFunction · 0.90
__init__Method · 0.85
_generate_test_envFunction · 0.85

Calls

no outgoing calls

Tested by 3

_create_project_chainMethod · 0.72
__init__Method · 0.68
_generate_test_envFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…