Split all the input blocks based on the split indices
(
blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]],
per_block_split_indices: List[List[int]],
owned_by_consumer: bool,
label_selector: Optional[Dict[str, str]] = None,
)
| 153 | |
| 154 | |
| 155 | def _split_all_blocks( |
| 156 | blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]], |
| 157 | per_block_split_indices: List[List[int]], |
| 158 | owned_by_consumer: bool, |
| 159 | label_selector: Optional[Dict[str, str]] = None, |
| 160 | ) -> Iterable[Tuple[ObjectRef[Block], BlockMetadata]]: |
| 161 | """Split all the input blocks based on the split indices""" |
| 162 | split_single_block = cached_remote_fn(_split_single_block) |
| 163 | |
| 164 | all_blocks_split_results: List[BlockPartition] = [None] * len(blocks_with_metadata) |
| 165 | |
| 166 | per_block_split_metadata_futures = [] |
| 167 | per_block_split_block_refs = [] |
| 168 | |
| 169 | # tracking splitted blocks for gc. |
| 170 | blocks_splitted = [] |
| 171 | for block_id, block_split_indices in enumerate(per_block_split_indices): |
| 172 | (block_ref, meta) = blocks_with_metadata[block_id] |
| 173 | block_row = meta.num_rows |
| 174 | block_split_indices = _drop_empty_block_split(block_split_indices, block_row) |
| 175 | if len(block_split_indices) == 0: |
| 176 | # optimization: if no split is needed, we just need to add it to the |
| 177 | # result |
| 178 | all_blocks_split_results[block_id] = [(block_ref, meta)] |
| 179 | else: |
| 180 | # otherwise call split remote function. |
| 181 | options = { |
| 182 | "scheduling_strategy": "SPREAD", |
| 183 | "num_returns": 2 + len(block_split_indices), |
| 184 | } |
| 185 | if label_selector: |
| 186 | options["label_selector"] = label_selector |
| 187 | object_refs = split_single_block.options(**options).remote( |
| 188 | block_id, |
| 189 | block_ref, |
| 190 | meta, |
| 191 | block_split_indices, |
| 192 | ) |
| 193 | per_block_split_metadata_futures.append(object_refs[0]) |
| 194 | per_block_split_block_refs.append(object_refs[1:]) |
| 195 | |
| 196 | blocks_splitted.append(block_ref) |
| 197 | |
| 198 | if per_block_split_metadata_futures: |
| 199 | # only get metadata. |
| 200 | per_block_split_metadata = ray.get(per_block_split_metadata_futures) |
| 201 | for (block_id, meta), block_refs in zip( |
| 202 | per_block_split_metadata, per_block_split_block_refs |
| 203 | ): |
| 204 | assert len(meta) == len(block_refs) |
| 205 | all_blocks_split_results[block_id] = zip(block_refs, meta) |
| 206 | |
| 207 | # We make a copy for the blocks that have been splitted, so the input blocks |
| 208 | # can be cleared if they are owned by consumer (consumer-owned blocks will |
| 209 | # only be consumed by the owner). |
| 210 | if owned_by_consumer: |
| 211 | for b in blocks_splitted: |
| 212 | trace_deallocation(b, "split._split_all_blocks") |
no test coverage detected
searching dependent graphs…