MCPcopy
hub / github.com/ray-project/ray / get_compute_strategy

Function get_compute_strategy

python/ray/data/_internal/util.py:589–696  ·  view source on GitHub ↗

Get `ComputeStrategy` based on the function or class, and concurrency information. Args: fn: The function or generator to apply to a record batch, or a class type that can be instantiated to create such a callable. fn_constructor_args: Positional arguments to pas

(
    fn: "UserDefinedFunction",
    fn_constructor_args: Optional[Iterable[Any]] = None,
    compute: Optional[Union[str, "ComputeStrategy"]] = None,
    concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
)

Source from the content-addressed store, hash-verified

587
588
589def get_compute_strategy(
590 fn: "UserDefinedFunction",
591 fn_constructor_args: Optional[Iterable[Any]] = None,
592 compute: Optional[Union[str, "ComputeStrategy"]] = None,
593 concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
594) -> "ComputeStrategy":
595 """Get `ComputeStrategy` based on the function or class, and concurrency
596 information.
597
598 Args:
599 fn: The function or generator to apply to a record batch, or a class type
600 that can be instantiated to create such a callable.
601 fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
602 compute: Either "tasks" (default) to use Ray Tasks or an
603 :class:`~ray.data.ActorPoolStrategy` to use an autoscaling actor pool.
604 concurrency: The number of Ray workers to use concurrently.
605
606 Returns:
607 The `ComputeStrategy` for execution.
608 """
609 # Lazily import these objects to avoid circular imports.
610 from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy
611 from ray.data.block import CallableClass
612
613 if isinstance(fn, CallableClass):
614 is_callable_class = True
615 else:
616 # TODO(chengsu): disallow object that is not a function. For example,
617 # An object instance of class often indicates a bug in user code.
618 is_callable_class = False
619 if fn_constructor_args is not None:
620 raise ValueError(
621 "``fn_constructor_args`` can only be specified if providing a "
622 f"callable class instance for ``fn``, but got: {fn}."
623 )
624
625 if compute is not None:
626 if is_callable_class and (
627 compute == "tasks" or isinstance(compute, TaskPoolStrategy)
628 ):
629 raise ValueError(
630 f"You specified the callable class {fn} as your UDF with the compute "
631 f"{compute}, but Ray Data can't schedule callable classes with the task "
632 f"pool strategy. To fix this error, pass an ActorPoolStrategy to compute or "
633 f"None to use the default compute strategy."
634 )
635 elif not is_callable_class and (
636 compute == "actors" or isinstance(compute, ActorPoolStrategy)
637 ):
638 raise ValueError(
639 f"You specified the function {fn} as your UDF with the compute "
640 f"{compute}, but Ray Data can't schedule regular functions with the actor "
641 f"pool strategy. To fix this error, pass a TaskPoolStrategy to compute or "
642 f"None to use the default compute strategy."
643 )
644 return compute
645 elif concurrency is not None:
646 # Legacy code path to support `concurrency` argument.

Callers 4

mapMethod · 0.90
flat_mapMethod · 0.90
filterMethod · 0.90

Calls 3

ActorPoolStrategyClass · 0.90
TaskPoolStrategyClass · 0.90
warningMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…