MCPcopy
hub / github.com/dask/dask / _layer

Method _layer

dask/array/_array_expr/_shuffle.py:102–198  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

100 return new_chunks
101
102 def _layer(self) -> dict:
103 chunks = self.array.chunks
104 axis = self.axis
105
106 chunk_boundaries = np.cumsum(chunks[axis])
107
108 # Get existing chunk tuple locations
109 chunk_tuples = list(
110 product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis))
111 )
112
113 intermediates: dict = dict()
114 merges: dict = dict()
115 dtype = np.min_scalar_type(max(*chunks[axis], self._chunk_size_limit))
116 split_name = f"shuffle-split-{self.deterministic_token}"
117 slices = [slice(None)] * len(chunks)
118 split_name_suffixes = count()
119 sorter_name = "shuffle-sorter-"
120 taker_name = "shuffle-taker-"
121
122 old_blocks = {
123 old_index: (self.array._name,) + old_index
124 for old_index in np.ndindex(tuple([len(c) for c in chunks]))
125 }
126
127 for new_chunk_idx, new_chunk_taker in enumerate(self._new_chunks):
128 new_chunk_taker = np.array(new_chunk_taker)
129 sorter = np.argsort(new_chunk_taker).astype(dtype)
130 sorter_key = sorter_name + tokenize(sorter)
131 # low level fusion can't deal with arrays on first position
132 merges[sorter_key] = DataNode(sorter_key, (1, sorter))
133
134 sorted_array = new_chunk_taker[sorter]
135 source_chunk_nr, taker_boundary_ = np.unique(
136 np.searchsorted(chunk_boundaries, sorted_array, side="right"),
137 return_index=True,
138 )
139 taker_boundary: list[int] = taker_boundary_.tolist()
140 taker_boundary.append(len(new_chunk_taker))
141
142 taker_cache: dict = {}
143 for chunk_tuple in chunk_tuples:
144 merge_keys = []
145
146 for c, b_start, b_end in zip(
147 source_chunk_nr, taker_boundary[:-1], taker_boundary[1:]
148 ):
149 # insert our axis chunk id into the chunk_tuple
150 chunk_key = convert_key(chunk_tuple, c, axis)
151 name = (split_name, next(split_name_suffixes))
152 this_slice = slices.copy()
153
154 # Cache the takers to allow de-duplication when serializing
155 # Ugly!
156 if c in taker_cache:
157 taker_key = taker_cache[c]
158 else:
159 this_slice[axis] = (

Callers

nothing calls this directly

Calls 13

DataNodeClass · 0.90
TaskClass · 0.90
TaskRefClass · 0.90
ListClass · 0.90
maxFunction · 0.85
popMethod · 0.80
convert_keyFunction · 0.70
countFunction · 0.50
tokenizeFunction · 0.50
cumsumMethod · 0.45
astypeMethod · 0.45
uniqueMethod · 0.45

Tested by

no test coverage detected