(x, window_shape, axis=None, automatic_rechunk=True)
| 819 | |
| 820 | @derived_from(np.lib.stride_tricks) |
| 821 | def sliding_window_view(x, window_shape, axis=None, automatic_rechunk=True): |
| 822 | window_shape = tuple(window_shape) if np.iterable(window_shape) else (window_shape,) |
| 823 | |
| 824 | window_shape_array = np.array(window_shape) |
| 825 | if np.any(window_shape_array <= 0): |
| 826 | raise ValueError("`window_shape` must contain values > 0") |
| 827 | |
| 828 | if axis is None: |
| 829 | axis = tuple(range(x.ndim)) |
| 830 | if len(window_shape) != len(axis): |
| 831 | raise ValueError( |
| 832 | f"Since axis is `None`, must provide " |
| 833 | f"window_shape for all dimensions of `x`; " |
| 834 | f"got {len(window_shape)} window_shape elements " |
| 835 | f"and `x.ndim` is {x.ndim}." |
| 836 | ) |
| 837 | else: |
| 838 | axis = normalize_axis_tuple(axis, x.ndim, allow_duplicate=True) |
| 839 | if len(window_shape) != len(axis): |
| 840 | raise ValueError( |
| 841 | f"Must provide matching length window_shape and " |
| 842 | f"axis; got {len(window_shape)} window_shape " |
| 843 | f"elements and {len(axis)} axes elements." |
| 844 | ) |
| 845 | |
| 846 | depths = [0] * x.ndim |
| 847 | for ax, window in zip(axis, window_shape): |
| 848 | depths[ax] += window - 1 |
| 849 | |
| 850 | # Ensure that each chunk is big enough to leave at least a size-1 chunk |
| 851 | # after windowing (this is only really necessary for the last chunk). |
| 852 | safe_chunks = list( |
| 853 | ensure_minimum_chunksize(d + 1, c) for d, c in zip(depths, x.chunks) |
| 854 | ) |
| 855 | if automatic_rechunk: |
| 856 | safe_chunks = [ |
| 857 | s if d != 0 else c for d, c, s in zip(depths, x.chunks, safe_chunks) |
| 858 | ] |
| 859 | # safe chunks is our output chunks, so add the new dimensions |
| 860 | safe_chunks.extend([(w,) for w in window_shape]) |
| 861 | max_chunk = reduce(mul, map(max, x.chunks)) |
| 862 | new_chunks = _calculate_new_chunksizes( |
| 863 | x.chunks, |
| 864 | safe_chunks.copy(), |
| 865 | {i for i, d in enumerate(depths) if d == 0}, |
| 866 | max_chunk, |
| 867 | ) |
| 868 | x = x.rechunk(tuple(new_chunks)) |
| 869 | else: |
| 870 | x = x.rechunk(tuple(safe_chunks)) |
| 871 | |
| 872 | # result.shape = x_shape_trimmed + window_shape, |
| 873 | # where x_shape_trimmed is x.shape with every entry |
| 874 | # reduced by one less than the corresponding window size. |
| 875 | # trim chunks to match x_shape_trimmed |
| 876 | newchunks = tuple(c[:-1] + (c[-1] - d,) for d, c in zip(depths, x.chunks)) + tuple( |
| 877 | (window,) for window in window_shape |
| 878 | ) |
nothing calls this directly
no test coverage detected
searching dependent graphs…