Creates a task that subsets an xarray dataset to a block determined by chunk_index. Block extents are determined by input_chunk_bounds. Also subtasks that subset the constituent variables of a dataset.
(
graph: dict, gname: str, dataset: Dataset, input_chunk_bounds, chunk_index
)
| 157 | |
| 158 | |
| 159 | def subset_dataset_to_block( |
| 160 | graph: dict, gname: str, dataset: Dataset, input_chunk_bounds, chunk_index |
| 161 | ): |
| 162 | """ |
| 163 | Creates a task that subsets an xarray dataset to a block determined by chunk_index. |
| 164 | Block extents are determined by input_chunk_bounds. |
| 165 | Also subtasks that subset the constituent variables of a dataset. |
| 166 | """ |
| 167 | import dask |
| 168 | |
| 169 | # this will become [[name1, variable1], |
| 170 | # [name2, variable2], |
| 171 | # ...] |
| 172 | # which is passed to dict and then to Dataset |
| 173 | data_vars = [] |
| 174 | coords = [] |
| 175 | |
| 176 | chunk_tuple = tuple(chunk_index.values()) |
| 177 | chunk_dims_set = set(chunk_index) |
| 178 | variable: Variable |
| 179 | for name, variable in dataset.variables.items(): |
| 180 | # make a task that creates tuple of (dims, chunk) |
| 181 | if dask.is_dask_collection(variable.data): |
| 182 | # get task name for chunk |
| 183 | chunk = ( |
| 184 | variable.data.name, |
| 185 | *tuple(chunk_index[dim] for dim in variable.dims), |
| 186 | ) |
| 187 | |
| 188 | chunk_variable_task = (f"{name}-{gname}-{chunk[0]!r}",) + chunk_tuple |
| 189 | graph[chunk_variable_task] = ( |
| 190 | tuple, |
| 191 | [variable.dims, chunk, variable.attrs], |
| 192 | ) |
| 193 | else: |
| 194 | assert name in dataset.dims or variable.ndim == 0 |
| 195 | |
| 196 | # non-dask array possibly with dimensions chunked on other variables |
| 197 | # index into variable appropriately |
| 198 | subsetter = { |
| 199 | dim: _get_chunk_slicer(dim, chunk_index, input_chunk_bounds) |
| 200 | for dim in variable.dims |
| 201 | } |
| 202 | if set(variable.dims) < chunk_dims_set: |
| 203 | this_var_chunk_tuple = tuple(chunk_index[dim] for dim in variable.dims) |
| 204 | else: |
| 205 | this_var_chunk_tuple = chunk_tuple |
| 206 | |
| 207 | chunk_variable_task = ( |
| 208 | f"{name}-{gname}-{dask.base.tokenize(subsetter)}", |
| 209 | ) + this_var_chunk_tuple |
| 210 | # We are including a dimension coordinate, |
| 211 | # minimize duplication by not copying it in the graph for every chunk. |
| 212 | if variable.ndim == 0 or chunk_variable_task not in graph: |
| 213 | subset = variable.isel(subsetter) |
| 214 | graph[chunk_variable_task] = ( |
| 215 | tuple, |
| 216 | [subset.dims, subset._data, subset.attrs], |
no test coverage detected
searching dependent graphs…