Read a stream from a custom :class:`~ray.data.Datasource`. Args: datasource: The :class:`~ray.data.Datasource` to read data from. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. num_cpus: The number of CPUs to reserve for each parallel rea
(
datasource: Datasource,
*,
parallelism: int = -1,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
compute: Optional[ComputeStrategy] = None,
override_num_blocks: Optional[int] = None,
**read_args,
)
| 591 | @PublicAPI |
| 592 | @wrap_auto_init |
| 593 | def read_datasource( |
| 594 | datasource: Datasource, |
| 595 | *, |
| 596 | parallelism: int = -1, |
| 597 | num_cpus: Optional[float] = None, |
| 598 | num_gpus: Optional[float] = None, |
| 599 | memory: Optional[float] = None, |
| 600 | ray_remote_args: Optional[Dict[str, Any]] = None, |
| 601 | concurrency: Optional[int] = None, |
| 602 | compute: Optional[ComputeStrategy] = None, |
| 603 | override_num_blocks: Optional[int] = None, |
| 604 | **read_args, |
| 605 | ) -> Dataset: |
| 606 | """Read a stream from a custom :class:`~ray.data.Datasource`. |
| 607 | |
| 608 | Args: |
| 609 | datasource: The :class:`~ray.data.Datasource` to read data from. |
| 610 | parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. |
| 611 | num_cpus: The number of CPUs to reserve for each parallel read worker. |
| 612 | num_gpus: The number of GPUs to reserve for each parallel read worker. For |
| 613 | example, specify `num_gpus=1` to request 1 GPU for each parallel read |
| 614 | worker. |
| 615 | memory: The heap memory in bytes to reserve for each parallel read worker. |
| 616 | ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. |
| 617 | concurrency: The maximum number of Ray tasks to run concurrently. Set this |
| 618 | to control number of tasks to run concurrently. This doesn't change the |
| 619 | total number of tasks run or the total number of output blocks. By default, |
| 620 | concurrency is dynamically decided based on the available resources. |
| 621 | compute: The compute strategy to use for reading. Pass an |
| 622 | :class:`~ray.data.ActorPoolStrategy` instance to use an actor pool, |
| 623 | or a :class:`~ray.data.TaskPoolStrategy` instance (default) to use Ray tasks. |
| 624 | If not specified, defaults to ``TaskPoolStrategy(concurrency)``. If both |
| 625 | ``compute`` and ``concurrency`` are specified, ``concurrency`` takes precedence. |
| 626 | override_num_blocks: Override the number of output blocks from all read tasks. |
| 627 | By default, the number of output blocks is dynamically decided based on |
| 628 | input data size and available resources. You shouldn't manually set this |
| 629 | value in most cases. |
| 630 | **read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource` |
| 631 | implementation. |
| 632 | |
| 633 | Returns: |
| 634 | :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`. |
| 635 | |
| 636 | Examples: |
| 637 | Read using default task-based execution: |
| 638 | |
| 639 | >>> import ray |
| 640 | >>> from ray.data._internal.datasource.range_datasource import RangeDatasource |
| 641 | >>> datasource = RangeDatasource(n=1000, block_format="arrow") |
| 642 | >>> ds = ray.data.read_datasource(datasource) # doctest: +SKIP |
| 643 | |
| 644 | Read using actors for stateful operations: |
| 645 | |
| 646 | >>> from ray.data import ActorPoolStrategy |
| 647 | >>> ds = ray.data.read_datasource( # doctest: +SKIP |
| 648 | ... datasource, |
| 649 | ... compute=ActorPoolStrategy(size=4) # Use 4 actors |
| 650 | ... ) |
searching dependent graphs…