MCPcopy
hub / github.com/dask/dask / _shuffle

Function _shuffle

dask/array/_shuffle.py:181–310  ·  view source on GitHub ↗
(chunks, indexer, axis, in_name, out_name, token)

Source from the content-addressed store, hash-verified

179
180
181def _shuffle(chunks, indexer, axis, in_name, out_name, token):
182 _validate_indexer(chunks, indexer, axis)
183
184 if len(indexer) == len(chunks[axis]):
185 # check if the array is already shuffled the way we want
186 ctr = 0
187 for idx, c in zip(indexer, chunks[axis]):
188 if idx != list(range(ctr, ctr + c)):
189 break
190 ctr += c
191 else:
192 return chunks, {}
193
194 chunksize_tolerance = config.get("array.chunk-size-tolerance")
195 chunk_size_limit = int(sum(chunks[axis]) / len(chunks[axis]) * chunksize_tolerance)
196
197 # Figure out how many groups we can put into one chunk
198 current_chunk, new_chunks = [], []
199 for idx in indexer:
200 if len(current_chunk) + len(idx) > chunk_size_limit and len(current_chunk) > 0:
201 new_chunks.append(current_chunk)
202 current_chunk = idx.copy()
203 else:
204 current_chunk.extend(idx)
205 if len(current_chunk) > chunk_size_limit / chunksize_tolerance:
206 new_chunks.append(current_chunk)
207 current_chunk = []
208 if len(current_chunk) > 0:
209 new_chunks.append(current_chunk)
210
211 # force 64 bit to avoid potential integer overflows on win32 and numpy<2
212 chunk_boundaries = np.cumsum(np.array(chunks[axis], dtype="uint64"))
213
214 # Get existing chunk tuple locations
215 chunk_tuples = list(
216 product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis))
217 )
218
219 intermediates = dict()
220 merges = dict()
221 dtype = np.min_scalar_type(max(*chunks[axis], chunk_size_limit))
222 split_name = f"shuffle-split-{token}"
223 slices = [slice(None)] * len(chunks)
224 split_name_suffixes = count()
225 sorter_name = "shuffle-sorter-"
226 taker_name = "shuffle-taker-"
227
228 old_blocks = {
229 old_index: (in_name,) + old_index
230 for old_index in np.ndindex(tuple([len(c) for c in chunks]))
231 }
232 for new_chunk_idx, new_chunk_taker in enumerate(new_chunks):
233 new_chunk_taker = np.array(new_chunk_taker)
234 sorter = np.argsort(new_chunk_taker).astype(dtype)
235 sorter_key = None
236
237 sorted_array = new_chunk_taker[sorter]
238 source_chunk_nr, taker_boundary = np.unique(

Callers 2

takeFunction · 0.90
shuffleFunction · 0.70

Calls 15

DataNodeClass · 0.90
TaskClass · 0.90
TaskRefClass · 0.90
ListClass · 0.90
maxFunction · 0.85
popMethod · 0.80
_validate_indexerFunction · 0.70
sumFunction · 0.70
countFunction · 0.70
convert_keyFunction · 0.70
tokenizeFunction · 0.50
getMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…