Construct a RandomAccessDataset (internal API). The constructor is a private API. Use ``ds.to_random_access_dataset()`` to construct a RandomAccessDataset.
(
self,
ds: "Dataset",
key: str,
num_workers: int,
)
| 36 | """ |
| 37 | |
| 38 | def __init__( |
| 39 | self, |
| 40 | ds: "Dataset", |
| 41 | key: str, |
| 42 | num_workers: int, |
| 43 | ): |
| 44 | """Construct a RandomAccessDataset (internal API). |
| 45 | |
| 46 | The constructor is a private API. Use ``ds.to_random_access_dataset()`` |
| 47 | to construct a RandomAccessDataset. |
| 48 | """ |
| 49 | schema = ds.schema(fetch_if_missing=True) |
| 50 | if schema is None or isinstance(schema, type): |
| 51 | raise ValueError("RandomAccessDataset only supports Arrow-format blocks.") |
| 52 | |
| 53 | start = time.perf_counter() |
| 54 | logger.info("[setup] Indexing dataset by sort key.") |
| 55 | sorted_ds = ds.sort(key) |
| 56 | ctx_label_selector = DataContext.get_current().execution_options.label_selector |
| 57 | get_bounds = cached_remote_fn(_get_bounds) |
| 58 | if ctx_label_selector: |
| 59 | get_bounds = get_bounds.options(label_selector=ctx_label_selector) |
| 60 | bundles = sorted_ds.iter_internal_ref_bundles() |
| 61 | blocks = _ref_bundles_iterator_to_block_refs_list(bundles) |
| 62 | |
| 63 | logger.info("[setup] Computing block range bounds.") |
| 64 | bounds = ray.get([get_bounds.remote(b, key) for b in blocks]) |
| 65 | self._non_empty_blocks = [] |
| 66 | self._lower_bound = None |
| 67 | self._upper_bounds = [] |
| 68 | for i, b in enumerate(bounds): |
| 69 | if b: |
| 70 | self._non_empty_blocks.append(blocks[i]) |
| 71 | if self._lower_bound is None: |
| 72 | self._lower_bound = b[0] |
| 73 | self._upper_bounds.append(b[1]) |
| 74 | |
| 75 | logger.info("[setup] Creating {} random access workers.".format(num_workers)) |
| 76 | ctx = DataContext.get_current() |
| 77 | worker_options = {"scheduling_strategy": ctx.scheduling_strategy} |
| 78 | if ctx_label_selector: |
| 79 | worker_options["label_selector"] = ctx_label_selector |
| 80 | self._workers = [ |
| 81 | _RandomAccessWorker.options(**worker_options).remote(key) |
| 82 | for _ in range(num_workers) |
| 83 | ] |
| 84 | ( |
| 85 | self._block_to_workers_map, |
| 86 | self._worker_to_blocks_map, |
| 87 | ) = self._compute_block_to_worker_assignments() |
| 88 | |
| 89 | logger.info( |
| 90 | "[setup] Worker to blocks assignment: {}".format(self._worker_to_blocks_map) |
| 91 | ) |
| 92 | ray.get( |
| 93 | [ |
| 94 | w.assign_blocks.remote( |
| 95 | { |
nothing calls this directly
no test coverage detected