Apply an ndarray level function over Variable and/or ndarray objects.
(
func,
*args,
signature: _UFuncSignature,
exclude_dims=frozenset(),
dask="forbidden",
output_dtypes=None,
vectorize=False,
keep_attrs="override",
dask_gufunc_kwargs=None,
)
| 705 | |
| 706 | |
| 707 | def apply_variable_ufunc( |
| 708 | func, |
| 709 | *args, |
| 710 | signature: _UFuncSignature, |
| 711 | exclude_dims=frozenset(), |
| 712 | dask="forbidden", |
| 713 | output_dtypes=None, |
| 714 | vectorize=False, |
| 715 | keep_attrs="override", |
| 716 | dask_gufunc_kwargs=None, |
| 717 | ) -> Variable | tuple[Variable, ...]: |
| 718 | """Apply an ndarray level function over Variable and/or ndarray objects.""" |
| 719 | from xarray.core.formatting import short_array_repr |
| 720 | from xarray.core.variable import as_compatible_data |
| 721 | |
| 722 | dim_sizes = unified_dim_sizes( |
| 723 | (a for a in args if hasattr(a, "dims")), exclude_dims=exclude_dims |
| 724 | ) |
| 725 | broadcast_dims = tuple( |
| 726 | dim for dim in dim_sizes if dim not in signature.all_core_dims |
| 727 | ) |
| 728 | output_dims = [broadcast_dims + out for out in signature.output_core_dims] |
| 729 | |
| 730 | input_data = [ |
| 731 | ( |
| 732 | broadcast_compat_data(arg, broadcast_dims, core_dims) |
| 733 | if isinstance(arg, Variable) |
| 734 | else arg |
| 735 | ) |
| 736 | for arg, core_dims in zip(args, signature.input_core_dims, strict=True) |
| 737 | ] |
| 738 | |
| 739 | if any(is_chunked_array(array) for array in input_data): |
| 740 | if dask == "forbidden": |
| 741 | raise ValueError( |
| 742 | "apply_ufunc encountered a chunked array on an " |
| 743 | "argument, but handling for chunked arrays has not " |
| 744 | "been enabled. Either set the ``dask`` argument " |
| 745 | "or load your data into memory first with " |
| 746 | "``.load()`` or ``.compute()``" |
| 747 | ) |
| 748 | elif dask == "parallelized": |
| 749 | chunkmanager = get_chunked_array_type(*input_data) |
| 750 | |
| 751 | numpy_func = func |
| 752 | |
| 753 | if dask_gufunc_kwargs is None: |
| 754 | dask_gufunc_kwargs = {} |
| 755 | else: |
| 756 | dask_gufunc_kwargs = dask_gufunc_kwargs.copy() |
| 757 | |
| 758 | allow_rechunk = dask_gufunc_kwargs.get("allow_rechunk", None) |
| 759 | if allow_rechunk is None: |
| 760 | for n, (data, core_dims) in enumerate( |
| 761 | zip(input_data, signature.input_core_dims, strict=True) |
| 762 | ): |
| 763 | if is_chunked_array(data): |
| 764 | # core dimensions cannot span multiple chunks |
nothing calls this directly
no test coverage detected
searching dependent graphs…