(
child: T,
blocker: Delayed | None,
omit_layers: set[str],
omit_keys: set[Key],
seed: Hashable,
)
| 313 | |
| 314 | |
| 315 | def _bind_one( |
| 316 | child: T, |
| 317 | blocker: Delayed | None, |
| 318 | omit_layers: set[str], |
| 319 | omit_keys: set[Key], |
| 320 | seed: Hashable, |
| 321 | ) -> T: |
| 322 | prev_coll_names = get_collection_names(child) |
| 323 | if not prev_coll_names: |
| 324 | # Collection with no keys; this is a legitimate use case but, at the moment of |
| 325 | # writing, can only happen with third-party collections |
| 326 | return child |
| 327 | |
| 328 | dsk = child.__dask_graph__() # type: ignore[attr-defined] |
| 329 | new_layers: dict[str, Layer] = {} |
| 330 | new_deps: dict[str, set[str]] = {} |
| 331 | |
| 332 | if isinstance(dsk, HighLevelGraph): |
| 333 | try: |
| 334 | layers_to_clone = set(child.__dask_layers__()) # type: ignore[attr-defined] |
| 335 | except AttributeError: |
| 336 | layers_to_clone = prev_coll_names.copy() |
| 337 | else: |
| 338 | if len(prev_coll_names) == 1: |
| 339 | hlg_name = next(iter(prev_coll_names)) |
| 340 | else: |
| 341 | hlg_name = tokenize(*prev_coll_names) |
| 342 | dsk = HighLevelGraph.from_collections(hlg_name, dsk) |
| 343 | layers_to_clone = {hlg_name} |
| 344 | |
| 345 | clone_keys = dsk.get_all_external_keys() - omit_keys |
| 346 | for layer_name in omit_layers: |
| 347 | try: |
| 348 | layer = dsk.layers[layer_name] |
| 349 | except KeyError: |
| 350 | continue |
| 351 | clone_keys -= layer.get_output_keys() |
| 352 | # Note: when assume_layers=True, clone_keys can contain keys of the omit collections |
| 353 | # that are not top-level. This is OK, as they will never be encountered inside the |
| 354 | # values of their dependent layers. |
| 355 | |
| 356 | if blocker is not None: |
| 357 | blocker_key = blocker.key |
| 358 | blocker_dsk = blocker.__dask_graph__() |
| 359 | assert isinstance(blocker_dsk, HighLevelGraph) |
| 360 | new_layers.update(blocker_dsk.layers) |
| 361 | new_deps.update(blocker_dsk.dependencies) |
| 362 | else: |
| 363 | blocker_key = None |
| 364 | |
| 365 | layers_to_copy_verbatim = set() |
| 366 | |
| 367 | while layers_to_clone: |
| 368 | prev_layer_name = layers_to_clone.pop() |
| 369 | new_layer_name = clone_key(prev_layer_name, seed=seed) |
| 370 | if new_layer_name in new_layers: |
| 371 | continue |
| 372 |
no test coverage detected
searching dependent graphs…