MCPcopy Index your code
hub / github.com/dask/dask / Shuffle

Class Shuffle

dask/array/_array_expr/_shuffle.py:47–198  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

45
46
47class Shuffle(ArrayExpr):
48 _parameters = ["array", "indexer", "axis", "name"]
49
50 @functools.cached_property
51 def _meta(self):
52 return self.array._meta
53
54 @functools.cached_property
55 def _name(self):
56 return f"{self.operand('name')}-{self._token}"
57
58 @functools.cached_property
59 def chunks(self):
60 output_chunks = []
61 for i, c in enumerate(self.array.chunks):
62 if i == self.axis:
63 output_chunks.append(tuple(map(len, self._new_chunks)))
64 else:
65 output_chunks.append(c)
66 return tuple(output_chunks)
67
68 @functools.cached_property
69 def _chunksize_tolerance(self):
70 return config.get("array.chunk-size-tolerance")
71
72 @functools.cached_property
73 def _chunk_size_limit(self):
74 return int(
75 sum(self.array.chunks[self.axis])
76 / len(self.array.chunks[self.axis])
77 * self._chunksize_tolerance
78 )
79
80 @functools.cached_property
81 def _new_chunks(self):
82 current_chunk, new_chunks = [], []
83 for idx in copy.deepcopy(self.indexer):
84 if (
85 len(current_chunk) + len(idx) > self._chunk_size_limit
86 and len(current_chunk) > 0
87 ):
88 new_chunks.append(current_chunk)
89 current_chunk = idx.copy()
90 else:
91 current_chunk.extend(idx)
92 if (
93 len(current_chunk)
94 > self._chunk_size_limit / self._chunksize_tolerance
95 ):
96 new_chunks.append(current_chunk)
97 current_chunk = []
98 if len(current_chunk) > 0:
99 new_chunks.append(current_chunk)
100 return new_chunks
101
102 def _layer(self) -> dict:
103 chunks = self.array.chunks
104 axis = self.axis

Callers 1

_shuffleFunction · 0.70

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…