(
self,
keys: set[Key],
seed: Hashable,
bind_to: Key | None = None,
)
| 744 | return self, culled_deps |
| 745 | |
| 746 | def clone( |
| 747 | self, |
| 748 | keys: set[Key], |
| 749 | seed: Hashable, |
| 750 | bind_to: Key | None = None, |
| 751 | ) -> tuple[Layer, bool]: |
| 752 | names = {get_name_from_key(k) for k in keys} |
| 753 | # We assume that 'keys' will contain either all or none of the output keys of |
| 754 | # each of the layers, because clone/bind are always invoked at collection level. |
| 755 | # Asserting this is very expensive, so we only check it during unit tests. |
| 756 | if "PYTEST_CURRENT_TEST" in os.environ: |
| 757 | assert not self.get_output_keys() - keys |
| 758 | for name, nb in self.numblocks.items(): |
| 759 | if name in names: |
| 760 | for block in product(*(list(range(nbi)) for nbi in nb)): |
| 761 | assert (name, *block) in keys |
| 762 | |
| 763 | is_leaf = True |
| 764 | |
| 765 | indices = [] |
| 766 | k: Key | TaskRef |
| 767 | for k, idxv in self.indices: |
| 768 | # Note: k may not be a key and thus not be hashable in the case where |
| 769 | # one or more args of blockwise() are sequences of literals; |
| 770 | # e.g. k = (list, [0, 1, 2]) |
| 771 | # See https://github.com/dask/dask/issues/8978 |
| 772 | |
| 773 | if ishashable(k) and k in names: |
| 774 | is_leaf = False |
| 775 | k = clone_key(k, seed) # type: ignore[type-var] |
| 776 | elif isinstance(k, TaskRef) and k.key in names: |
| 777 | is_leaf = False |
| 778 | k = TaskRef(clone_key(k.key, seed)) |
| 779 | |
| 780 | indices.append((k, idxv)) |
| 781 | |
| 782 | numblocks: dict[str, Sequence[int]] = {} |
| 783 | for k, nbv in self.numblocks.items(): |
| 784 | if k in names: |
| 785 | is_leaf = False |
| 786 | k = clone_key(k, seed) |
| 787 | numblocks[k] = nbv |
| 788 | |
| 789 | if bind_to is not None and is_leaf: |
| 790 | from dask.graph_manipulation import chunks |
| 791 | |
| 792 | # It's always a Delayed generated by dask.graph_manipulation.checkpoint; |
| 793 | # the layer name always matches the key |
| 794 | assert isinstance(bind_to, str) |
| 795 | newtask = Task( |
| 796 | clone_key(self.task.key, seed), |
| 797 | chunks.bind, |
| 798 | self.task, |
| 799 | TaskRef(blockwise_token(len(indices))), |
| 800 | _data_producer=self.task.data_producer, |
| 801 | ) |
| 802 | indices.append((TaskRef(bind_to), None)) |
| 803 | else: |
no test coverage detected