MCPcopy
hub / github.com/ray-project/ray / __init__

Method __init__

python/ray/data/random_access_dataset.py:38–105  ·  view source on GitHub ↗

Construct a RandomAccessDataset (internal API). The constructor is a private API. Use ``ds.to_random_access_dataset()`` to construct a RandomAccessDataset.

(
        self,
        ds: "Dataset",
        key: str,
        num_workers: int,
    )

Source from the content-addressed store, hash-verified

36 """
37
38 def __init__(
39 self,
40 ds: "Dataset",
41 key: str,
42 num_workers: int,
43 ):
44 """Construct a RandomAccessDataset (internal API).
45
46 The constructor is a private API. Use ``ds.to_random_access_dataset()``
47 to construct a RandomAccessDataset.
48 """
49 schema = ds.schema(fetch_if_missing=True)
50 if schema is None or isinstance(schema, type):
51 raise ValueError("RandomAccessDataset only supports Arrow-format blocks.")
52
53 start = time.perf_counter()
54 logger.info("[setup] Indexing dataset by sort key.")
55 sorted_ds = ds.sort(key)
56 ctx_label_selector = DataContext.get_current().execution_options.label_selector
57 get_bounds = cached_remote_fn(_get_bounds)
58 if ctx_label_selector:
59 get_bounds = get_bounds.options(label_selector=ctx_label_selector)
60 bundles = sorted_ds.iter_internal_ref_bundles()
61 blocks = _ref_bundles_iterator_to_block_refs_list(bundles)
62
63 logger.info("[setup] Computing block range bounds.")
64 bounds = ray.get([get_bounds.remote(b, key) for b in blocks])
65 self._non_empty_blocks = []
66 self._lower_bound = None
67 self._upper_bounds = []
68 for i, b in enumerate(bounds):
69 if b:
70 self._non_empty_blocks.append(blocks[i])
71 if self._lower_bound is None:
72 self._lower_bound = b[0]
73 self._upper_bounds.append(b[1])
74
75 logger.info("[setup] Creating {} random access workers.".format(num_workers))
76 ctx = DataContext.get_current()
77 worker_options = {"scheduling_strategy": ctx.scheduling_strategy}
78 if ctx_label_selector:
79 worker_options["label_selector"] = ctx_label_selector
80 self._workers = [
81 _RandomAccessWorker.options(**worker_options).remote(key)
82 for _ in range(num_workers)
83 ]
84 (
85 self._block_to_workers_map,
86 self._worker_to_blocks_map,
87 ) = self._compute_block_to_worker_assignments()
88
89 logger.info(
90 "[setup] Worker to blocks assignment: {}".format(self._worker_to_blocks_map)
91 )
92 ray.get(
93 [
94 w.assign_blocks.remote(
95 {

Callers

nothing calls this directly

Calls 14

cached_remote_fnFunction · 0.90
rangeFunction · 0.70
getMethod · 0.65
schemaMethod · 0.45
infoMethod · 0.45
sortMethod · 0.45
get_currentMethod · 0.45
optionsMethod · 0.45
remoteMethod · 0.45

Tested by

no test coverage detected