Apply func to all keys of collection. Create a Blockwise layer whenever possible; fall back to MaterializedLayer otherwise. Parameters ---------- func Callable to be invoked on the graph node prev_name : str name of the layer to map from; in case of dask base col
(
func: Callable,
prev_name: str,
new_name: str,
collection,
dependencies: tuple[Delayed, ...] = (),
)
| 146 | |
| 147 | |
| 148 | def _build_map_layer( |
| 149 | func: Callable, |
| 150 | prev_name: str, |
| 151 | new_name: str, |
| 152 | collection, |
| 153 | dependencies: tuple[Delayed, ...] = (), |
| 154 | ) -> Layer: |
| 155 | """Apply func to all keys of collection. Create a Blockwise layer whenever possible; |
| 156 | fall back to MaterializedLayer otherwise. |
| 157 | |
| 158 | Parameters |
| 159 | ---------- |
| 160 | func |
| 161 | Callable to be invoked on the graph node |
| 162 | prev_name : str |
| 163 | name of the layer to map from; in case of dask base collections, this is the |
| 164 | collection name. Note how third-party collections, e.g. xarray.Dataset, can |
| 165 | have multiple names. |
| 166 | new_name : str |
| 167 | name of the layer to map to |
| 168 | collection |
| 169 | Arbitrary dask collection |
| 170 | dependencies |
| 171 | Zero or more Delayed objects, which will be passed as arbitrary variadic args to |
| 172 | func after the collection's chunk |
| 173 | """ |
| 174 | if _can_apply_blockwise(collection): |
| 175 | # Use a Blockwise layer |
| 176 | try: |
| 177 | numblocks = collection.numblocks |
| 178 | except AttributeError: |
| 179 | numblocks = (collection.npartitions,) |
| 180 | indices = tuple(i for i, _ in enumerate(numblocks)) |
| 181 | kwargs = ( |
| 182 | {"_deps": List(*[TaskRef(d.key) for d in dependencies])} |
| 183 | if dependencies |
| 184 | else {} |
| 185 | ) |
| 186 | |
| 187 | return blockwise( |
| 188 | func, |
| 189 | new_name, |
| 190 | indices, |
| 191 | prev_name, |
| 192 | indices, |
| 193 | numblocks={prev_name: numblocks}, |
| 194 | dependencies=dependencies, |
| 195 | **kwargs, |
| 196 | ) |
| 197 | else: |
| 198 | # Delayed, bag.Item, dataframe.core.Scalar, or third-party collection; |
| 199 | # fall back to MaterializedLayer |
| 200 | dep_keys = tuple(d.key for d in dependencies) |
| 201 | return MaterializedLayer( |
| 202 | { |
| 203 | replace_name_in_key(k, {prev_name: new_name}): (func, k) + dep_keys |
| 204 | for k in flatten(collection.__dask_keys__()) |
| 205 | if get_name_from_key(k) == prev_name |
no test coverage detected
searching dependent graphs…