(self)
| 100 | return new_chunks |
| 101 | |
| 102 | def _layer(self) -> dict: |
| 103 | chunks = self.array.chunks |
| 104 | axis = self.axis |
| 105 | |
| 106 | chunk_boundaries = np.cumsum(chunks[axis]) |
| 107 | |
| 108 | # Get existing chunk tuple locations |
| 109 | chunk_tuples = list( |
| 110 | product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis)) |
| 111 | ) |
| 112 | |
| 113 | intermediates: dict = dict() |
| 114 | merges: dict = dict() |
| 115 | dtype = np.min_scalar_type(max(*chunks[axis], self._chunk_size_limit)) |
| 116 | split_name = f"shuffle-split-{self.deterministic_token}" |
| 117 | slices = [slice(None)] * len(chunks) |
| 118 | split_name_suffixes = count() |
| 119 | sorter_name = "shuffle-sorter-" |
| 120 | taker_name = "shuffle-taker-" |
| 121 | |
| 122 | old_blocks = { |
| 123 | old_index: (self.array._name,) + old_index |
| 124 | for old_index in np.ndindex(tuple([len(c) for c in chunks])) |
| 125 | } |
| 126 | |
| 127 | for new_chunk_idx, new_chunk_taker in enumerate(self._new_chunks): |
| 128 | new_chunk_taker = np.array(new_chunk_taker) |
| 129 | sorter = np.argsort(new_chunk_taker).astype(dtype) |
| 130 | sorter_key = sorter_name + tokenize(sorter) |
| 131 | # low level fusion can't deal with arrays on first position |
| 132 | merges[sorter_key] = DataNode(sorter_key, (1, sorter)) |
| 133 | |
| 134 | sorted_array = new_chunk_taker[sorter] |
| 135 | source_chunk_nr, taker_boundary_ = np.unique( |
| 136 | np.searchsorted(chunk_boundaries, sorted_array, side="right"), |
| 137 | return_index=True, |
| 138 | ) |
| 139 | taker_boundary: list[int] = taker_boundary_.tolist() |
| 140 | taker_boundary.append(len(new_chunk_taker)) |
| 141 | |
| 142 | taker_cache: dict = {} |
| 143 | for chunk_tuple in chunk_tuples: |
| 144 | merge_keys = [] |
| 145 | |
| 146 | for c, b_start, b_end in zip( |
| 147 | source_chunk_nr, taker_boundary[:-1], taker_boundary[1:] |
| 148 | ): |
| 149 | # insert our axis chunk id into the chunk_tuple |
| 150 | chunk_key = convert_key(chunk_tuple, c, axis) |
| 151 | name = (split_name, next(split_name_suffixes)) |
| 152 | this_slice = slices.copy() |
| 153 | |
| 154 | # Cache the takers to allow de-duplication when serializing |
| 155 | # Ugly! |
| 156 | if c in taker_cache: |
| 157 | taker_key = taker_cache[c] |
| 158 | else: |
| 159 | this_slice[axis] = ( |
nothing calls this directly
no test coverage detected