MCPcopy Index your code
hub / github.com/ray-project/ray / _split_all_blocks

Function _split_all_blocks

python/ray/data/_internal/split.py:155–217  ·  view source on GitHub ↗

Split all the input blocks based on the split indices

(
    blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]],
    per_block_split_indices: List[List[int]],
    owned_by_consumer: bool,
    label_selector: Optional[Dict[str, str]] = None,
)

Source from the content-addressed store, hash-verified

153
154
155def _split_all_blocks(
156 blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]],
157 per_block_split_indices: List[List[int]],
158 owned_by_consumer: bool,
159 label_selector: Optional[Dict[str, str]] = None,
160) -> Iterable[Tuple[ObjectRef[Block], BlockMetadata]]:
161 """Split all the input blocks based on the split indices"""
162 split_single_block = cached_remote_fn(_split_single_block)
163
164 all_blocks_split_results: List[BlockPartition] = [None] * len(blocks_with_metadata)
165
166 per_block_split_metadata_futures = []
167 per_block_split_block_refs = []
168
169 # tracking splitted blocks for gc.
170 blocks_splitted = []
171 for block_id, block_split_indices in enumerate(per_block_split_indices):
172 (block_ref, meta) = blocks_with_metadata[block_id]
173 block_row = meta.num_rows
174 block_split_indices = _drop_empty_block_split(block_split_indices, block_row)
175 if len(block_split_indices) == 0:
176 # optimization: if no split is needed, we just need to add it to the
177 # result
178 all_blocks_split_results[block_id] = [(block_ref, meta)]
179 else:
180 # otherwise call split remote function.
181 options = {
182 "scheduling_strategy": "SPREAD",
183 "num_returns": 2 + len(block_split_indices),
184 }
185 if label_selector:
186 options["label_selector"] = label_selector
187 object_refs = split_single_block.options(**options).remote(
188 block_id,
189 block_ref,
190 meta,
191 block_split_indices,
192 )
193 per_block_split_metadata_futures.append(object_refs[0])
194 per_block_split_block_refs.append(object_refs[1:])
195
196 blocks_splitted.append(block_ref)
197
198 if per_block_split_metadata_futures:
199 # only get metadata.
200 per_block_split_metadata = ray.get(per_block_split_metadata_futures)
201 for (block_id, meta), block_refs in zip(
202 per_block_split_metadata, per_block_split_block_refs
203 ):
204 assert len(meta) == len(block_refs)
205 all_blocks_split_results[block_id] = zip(block_refs, meta)
206
207 # We make a copy for the blocks that have been splitted, so the input blocks
208 # can be cleared if they are owned by consumer (consumer-owned blocks will
209 # only be consumed by the owner).
210 if owned_by_consumer:
211 for b in blocks_splitted:
212 trace_deallocation(b, "split._split_all_blocks")

Callers 1

_split_at_indicesFunction · 0.85

Calls 7

cached_remote_fnFunction · 0.90
trace_deallocationFunction · 0.90
_drop_empty_block_splitFunction · 0.85
getMethod · 0.65
remoteMethod · 0.45
optionsMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…