MCPcopy
hub / github.com/ray-project/ray / _read_datasource_v2

Function _read_datasource_v2

python/ray/data/read_api.py:426–588  ·  view source on GitHub ↗

Internal entry point for ``DataSourceV2`` reads. Wires a ``ListFiles → ReadFiles`` logical chain: - :class:`ListFiles` owns listing (via the datasource's ``FileIndexer``), optional global shuffle (``FileShuffleConfig``), and size-balanced bucketing (``RoundRobinPartitioner``).

(
    datasource,
    *,
    parallelism: int = -1,
    num_cpus: Optional[float] = None,
    num_gpus: Optional[float] = None,
    memory: Optional[float] = None,
    ray_remote_args: Optional[Dict[str, Any]] = None,
    concurrency: Optional[int] = None,
    compute: Optional[ComputeStrategy] = None,
    partition_filter: Optional[PathPartitionFilter] = None,
    block_udf: Optional[Callable[[Any], Any]] = None,
)

Source from the content-addressed store, hash-verified

424
425@wrap_auto_init
426def _read_datasource_v2(
427 datasource,
428 *,
429 parallelism: int = -1,
430 num_cpus: Optional[float] = None,
431 num_gpus: Optional[float] = None,
432 memory: Optional[float] = None,
433 ray_remote_args: Optional[Dict[str, Any]] = None,
434 concurrency: Optional[int] = None,
435 compute: Optional[ComputeStrategy] = None,
436 partition_filter: Optional[PathPartitionFilter] = None,
437 block_udf: Optional[Callable[[Any], Any]] = None,
438) -> Dataset:
439 """Internal entry point for ``DataSourceV2`` reads.
440
441 Wires a ``ListFiles → ReadFiles`` logical chain:
442
443 - :class:`ListFiles` owns listing (via the datasource's ``FileIndexer``),
444 optional global shuffle (``FileShuffleConfig``), and size-balanced
445 bucketing (``RoundRobinPartitioner``). Its physical planner
446 parallelizes listing across path shards and emits manifest blocks.
447 - :class:`ReadFiles` consumes the manifest blocks and reads each bucket
448 via ``scanner.create_reader().read(manifest)``.
449
450 Schema inference happens once on the driver by sampling the first
451 file — no caching layer needed.
452 """
453 import time
454
455 from ray.data._internal.datasource_v2.listing.listing_utils import (
456 _build_pruners,
457 sample_files,
458 )
459 from ray.data._internal.datasource_v2.partitioners.round_robin_partitioner import (
460 RoundRobinPartitioner,
461 )
462 from ray.data.datasource.file_based_datasource import FileShuffleConfig
463
464 ctx = DataContext.get_current()
465
466 if not datasource.supports_distributed_reads:
467 import ray.util.client
468
469 if ray.util.client.ray.is_connected():
470 raise ValueError(
471 "Because you're using Ray Client, read tasks scheduled on the "
472 "Ray cluster can't access your local files. To fix this issue, "
473 "store files in cloud storage or a distributed filesystem like "
474 "NFS."
475 )
476
477 ray_remote_args = _resolve_read_remote_args(
478 datasource,
479 ray_remote_args,
480 num_cpus,
481 num_gpus,
482 memory,
483 ctx,

Callers 1

read_parquetFunction · 0.85

Calls 15

_build_prunersFunction · 0.90
sample_filesFunction · 0.90
ListFilesClass · 0.90
ReadFilesClass · 0.90
DatasetStatsClass · 0.90
LogicalPlanClass · 0.90
DatasetClass · 0.90
listFunction · 0.85
copyMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…