(
*,
size: int,
chunk_sizes: tuple[int, ...],
preferred_chunk_sizes: int | tuple[int, ...],
)
| 28 | |
| 29 | @lru_cache(maxsize=512) |
| 30 | def _get_breaks_cached( |
| 31 | *, |
| 32 | size: int, |
| 33 | chunk_sizes: tuple[int, ...], |
| 34 | preferred_chunk_sizes: int | tuple[int, ...], |
| 35 | ) -> int | None: |
| 36 | if isinstance(preferred_chunk_sizes, int) and preferred_chunk_sizes == 1: |
| 37 | # short-circuit for the trivial case |
| 38 | return None |
| 39 | # Determine the stop indices of the preferred chunks, but omit the last stop |
| 40 | # (equal to the dim size). In particular, assume that when a sequence |
| 41 | # expresses the preferred chunks, the sequence sums to the size. |
| 42 | preferred_stops = ( |
| 43 | range(preferred_chunk_sizes, size, preferred_chunk_sizes) |
| 44 | if isinstance(preferred_chunk_sizes, int) |
| 45 | else set(itertools.accumulate(preferred_chunk_sizes[:-1])) |
| 46 | ) |
| 47 | |
| 48 | # Gather any stop indices of the specified chunks that are not a stop index |
| 49 | # of a preferred chunk. Again, omit the last stop, assuming that it equals |
| 50 | # the dim size. |
| 51 | actual_stops = itertools.accumulate(chunk_sizes[:-1]) |
| 52 | # This copy is required for parallel iteration |
| 53 | actual_stops_2 = itertools.accumulate(chunk_sizes[:-1]) |
| 54 | |
| 55 | disagrees = itertools.compress( |
| 56 | actual_stops_2, (a not in preferred_stops for a in actual_stops) |
| 57 | ) |
| 58 | try: |
| 59 | return next(disagrees) |
| 60 | except StopIteration: |
| 61 | return None |
| 62 | |
| 63 | |
| 64 | def _maybe_chunk( |
no outgoing calls
no test coverage detected
searching dependent graphs…