Internal entry point for ``DataSourceV2`` reads. Wires a ``ListFiles → ReadFiles`` logical chain: - :class:`ListFiles` owns listing (via the datasource's ``FileIndexer``), optional global shuffle (``FileShuffleConfig``), and size-balanced bucketing (``RoundRobinPartitioner``).
(
datasource,
*,
parallelism: int = -1,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
compute: Optional[ComputeStrategy] = None,
partition_filter: Optional[PathPartitionFilter] = None,
block_udf: Optional[Callable[[Any], Any]] = None,
)
| 424 | |
| 425 | @wrap_auto_init |
| 426 | def _read_datasource_v2( |
| 427 | datasource, |
| 428 | *, |
| 429 | parallelism: int = -1, |
| 430 | num_cpus: Optional[float] = None, |
| 431 | num_gpus: Optional[float] = None, |
| 432 | memory: Optional[float] = None, |
| 433 | ray_remote_args: Optional[Dict[str, Any]] = None, |
| 434 | concurrency: Optional[int] = None, |
| 435 | compute: Optional[ComputeStrategy] = None, |
| 436 | partition_filter: Optional[PathPartitionFilter] = None, |
| 437 | block_udf: Optional[Callable[[Any], Any]] = None, |
| 438 | ) -> Dataset: |
| 439 | """Internal entry point for ``DataSourceV2`` reads. |
| 440 | |
| 441 | Wires a ``ListFiles → ReadFiles`` logical chain: |
| 442 | |
| 443 | - :class:`ListFiles` owns listing (via the datasource's ``FileIndexer``), |
| 444 | optional global shuffle (``FileShuffleConfig``), and size-balanced |
| 445 | bucketing (``RoundRobinPartitioner``). Its physical planner |
| 446 | parallelizes listing across path shards and emits manifest blocks. |
| 447 | - :class:`ReadFiles` consumes the manifest blocks and reads each bucket |
| 448 | via ``scanner.create_reader().read(manifest)``. |
| 449 | |
| 450 | Schema inference happens once on the driver by sampling the first |
| 451 | file — no caching layer needed. |
| 452 | """ |
| 453 | import time |
| 454 | |
| 455 | from ray.data._internal.datasource_v2.listing.listing_utils import ( |
| 456 | _build_pruners, |
| 457 | sample_files, |
| 458 | ) |
| 459 | from ray.data._internal.datasource_v2.partitioners.round_robin_partitioner import ( |
| 460 | RoundRobinPartitioner, |
| 461 | ) |
| 462 | from ray.data.datasource.file_based_datasource import FileShuffleConfig |
| 463 | |
| 464 | ctx = DataContext.get_current() |
| 465 | |
| 466 | if not datasource.supports_distributed_reads: |
| 467 | import ray.util.client |
| 468 | |
| 469 | if ray.util.client.ray.is_connected(): |
| 470 | raise ValueError( |
| 471 | "Because you're using Ray Client, read tasks scheduled on the " |
| 472 | "Ray cluster can't access your local files. To fix this issue, " |
| 473 | "store files in cloud storage or a distributed filesystem like " |
| 474 | "NFS." |
| 475 | ) |
| 476 | |
| 477 | ray_remote_args = _resolve_read_remote_args( |
| 478 | datasource, |
| 479 | ray_remote_args, |
| 480 | num_cpus, |
| 481 | num_gpus, |
| 482 | memory, |
| 483 | ctx, |
no test coverage detected
searching dependent graphs…