(chunks, indexer, axis, in_name, out_name, token)
| 179 | |
| 180 | |
| 181 | def _shuffle(chunks, indexer, axis, in_name, out_name, token): |
| 182 | _validate_indexer(chunks, indexer, axis) |
| 183 | |
| 184 | if len(indexer) == len(chunks[axis]): |
| 185 | # check if the array is already shuffled the way we want |
| 186 | ctr = 0 |
| 187 | for idx, c in zip(indexer, chunks[axis]): |
| 188 | if idx != list(range(ctr, ctr + c)): |
| 189 | break |
| 190 | ctr += c |
| 191 | else: |
| 192 | return chunks, {} |
| 193 | |
| 194 | chunksize_tolerance = config.get("array.chunk-size-tolerance") |
| 195 | chunk_size_limit = int(sum(chunks[axis]) / len(chunks[axis]) * chunksize_tolerance) |
| 196 | |
| 197 | # Figure out how many groups we can put into one chunk |
| 198 | current_chunk, new_chunks = [], [] |
| 199 | for idx in indexer: |
| 200 | if len(current_chunk) + len(idx) > chunk_size_limit and len(current_chunk) > 0: |
| 201 | new_chunks.append(current_chunk) |
| 202 | current_chunk = idx.copy() |
| 203 | else: |
| 204 | current_chunk.extend(idx) |
| 205 | if len(current_chunk) > chunk_size_limit / chunksize_tolerance: |
| 206 | new_chunks.append(current_chunk) |
| 207 | current_chunk = [] |
| 208 | if len(current_chunk) > 0: |
| 209 | new_chunks.append(current_chunk) |
| 210 | |
| 211 | # force 64 bit to avoid potential integer overflows on win32 and numpy<2 |
| 212 | chunk_boundaries = np.cumsum(np.array(chunks[axis], dtype="uint64")) |
| 213 | |
| 214 | # Get existing chunk tuple locations |
| 215 | chunk_tuples = list( |
| 216 | product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis)) |
| 217 | ) |
| 218 | |
| 219 | intermediates = dict() |
| 220 | merges = dict() |
| 221 | dtype = np.min_scalar_type(max(*chunks[axis], chunk_size_limit)) |
| 222 | split_name = f"shuffle-split-{token}" |
| 223 | slices = [slice(None)] * len(chunks) |
| 224 | split_name_suffixes = count() |
| 225 | sorter_name = "shuffle-sorter-" |
| 226 | taker_name = "shuffle-taker-" |
| 227 | |
| 228 | old_blocks = { |
| 229 | old_index: (in_name,) + old_index |
| 230 | for old_index in np.ndindex(tuple([len(c) for c in chunks])) |
| 231 | } |
| 232 | for new_chunk_idx, new_chunk_taker in enumerate(new_chunks): |
| 233 | new_chunk_taker = np.array(new_chunk_taker) |
| 234 | sorter = np.argsort(new_chunk_taker).astype(dtype) |
| 235 | sorter_key = None |
| 236 | |
| 237 | sorted_array = new_chunk_taker[sorter] |
| 238 | source_chunk_nr, taker_boundary = np.unique( |
no test coverage detected
searching dependent graphs…