(self)
| 74 | |
| 75 | @cached_property |
| 76 | def chunks(self): |
| 77 | if self.align_arrays: |
| 78 | chunkss, arrays, _ = unify_chunks_expr(*self.args) |
| 79 | else: |
| 80 | arginds = [ |
| 81 | (a, i) for (a, i) in toolz.partition(2, self.args) if i is not None |
| 82 | ] |
| 83 | chunkss = {} |
| 84 | # For each dimension, use the input chunking that has the most blocks; |
| 85 | # this will ensure that broadcasting works as expected, and in |
| 86 | # particular the number of blocks should be correct if the inputs are |
| 87 | # consistent. |
| 88 | for arg, ind in arginds: |
| 89 | for c, i in zip(arg.chunks, ind): |
| 90 | if i not in chunkss or len(c) > len(chunkss[i]): |
| 91 | chunkss[i] = c |
| 92 | |
| 93 | for k, v in self.new_axes.items(): |
| 94 | if not isinstance(v, tuple): |
| 95 | v = (v,) |
| 96 | chunkss[k] = v |
| 97 | |
| 98 | chunks = [chunkss[i] for i in self.out_ind] |
| 99 | if self.adjust_chunks: |
| 100 | for i, ind in enumerate(self.out_ind): |
| 101 | if ind in self.adjust_chunks: |
| 102 | if callable(self.adjust_chunks[ind]): |
| 103 | chunks[i] = tuple(map(self.adjust_chunks[ind], chunks[i])) |
| 104 | elif isinstance(self.adjust_chunks[ind], numbers.Integral): |
| 105 | chunks[i] = tuple(self.adjust_chunks[ind] for _ in chunks[i]) |
| 106 | elif isinstance(self.adjust_chunks[ind], (tuple, list)): |
| 107 | if len(self.adjust_chunks[ind]) != len(chunks[i]): |
| 108 | raise ValueError( |
| 109 | f"Dimension {i} has {len(chunks[i])} blocks, adjust_chunks " |
| 110 | f"specified with {len(self.adjust_chunks[ind])} blocks" |
| 111 | ) |
| 112 | chunks[i] = tuple(self.adjust_chunks[ind]) |
| 113 | else: |
| 114 | raise NotImplementedError( |
| 115 | "adjust_chunks values must be callable, int, or tuple" |
| 116 | ) |
| 117 | chunks = tuple(chunks) |
| 118 | return tuple(map(tuple, chunks)) |
| 119 | |
| 120 | @cached_property |
| 121 | def dtype(self): |
nothing calls this directly
no test coverage detected