Get `ComputeStrategy` based on the function or class, and concurrency information. Args: fn: The function or generator to apply to a record batch, or a class type that can be instantiated to create such a callable. fn_constructor_args: Positional arguments to pas
(
fn: "UserDefinedFunction",
fn_constructor_args: Optional[Iterable[Any]] = None,
compute: Optional[Union[str, "ComputeStrategy"]] = None,
concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
)
| 587 | |
| 588 | |
| 589 | def get_compute_strategy( |
| 590 | fn: "UserDefinedFunction", |
| 591 | fn_constructor_args: Optional[Iterable[Any]] = None, |
| 592 | compute: Optional[Union[str, "ComputeStrategy"]] = None, |
| 593 | concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None, |
| 594 | ) -> "ComputeStrategy": |
| 595 | """Get `ComputeStrategy` based on the function or class, and concurrency |
| 596 | information. |
| 597 | |
| 598 | Args: |
| 599 | fn: The function or generator to apply to a record batch, or a class type |
| 600 | that can be instantiated to create such a callable. |
| 601 | fn_constructor_args: Positional arguments to pass to ``fn``'s constructor. |
| 602 | compute: Either "tasks" (default) to use Ray Tasks or an |
| 603 | :class:`~ray.data.ActorPoolStrategy` to use an autoscaling actor pool. |
| 604 | concurrency: The number of Ray workers to use concurrently. |
| 605 | |
| 606 | Returns: |
| 607 | The `ComputeStrategy` for execution. |
| 608 | """ |
| 609 | # Lazily import these objects to avoid circular imports. |
| 610 | from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy |
| 611 | from ray.data.block import CallableClass |
| 612 | |
| 613 | if isinstance(fn, CallableClass): |
| 614 | is_callable_class = True |
| 615 | else: |
| 616 | # TODO(chengsu): disallow object that is not a function. For example, |
| 617 | # An object instance of class often indicates a bug in user code. |
| 618 | is_callable_class = False |
| 619 | if fn_constructor_args is not None: |
| 620 | raise ValueError( |
| 621 | "``fn_constructor_args`` can only be specified if providing a " |
| 622 | f"callable class instance for ``fn``, but got: {fn}." |
| 623 | ) |
| 624 | |
| 625 | if compute is not None: |
| 626 | if is_callable_class and ( |
| 627 | compute == "tasks" or isinstance(compute, TaskPoolStrategy) |
| 628 | ): |
| 629 | raise ValueError( |
| 630 | f"You specified the callable class {fn} as your UDF with the compute " |
| 631 | f"{compute}, but Ray Data can't schedule callable classes with the task " |
| 632 | f"pool strategy. To fix this error, pass an ActorPoolStrategy to compute or " |
| 633 | f"None to use the default compute strategy." |
| 634 | ) |
| 635 | elif not is_callable_class and ( |
| 636 | compute == "actors" or isinstance(compute, ActorPoolStrategy) |
| 637 | ): |
| 638 | raise ValueError( |
| 639 | f"You specified the function {fn} as your UDF with the compute " |
| 640 | f"{compute}, but Ray Data can't schedule regular functions with the actor " |
| 641 | f"pool strategy. To fix this error, pass a TaskPoolStrategy to compute or " |
| 642 | f"None to use the default compute strategy." |
| 643 | ) |
| 644 | return compute |
| 645 | elif concurrency is not None: |
| 646 | # Legacy code path to support `concurrency` argument. |
no test coverage detected
searching dependent graphs…