Remove unnecessary tasks from the layer In other words, return a new Layer with only the tasks required to calculate `keys` and a map of external key dependencies. Examples -------- >>> inc = lambda x: x + 1 >>> add = lambda x, y: x + y >>> d
(
self, keys: set[Key], all_hlg_keys: Collection[Key]
)
| 125 | return self.keys() # this implementation will materialize the graph |
| 126 | |
| 127 | def cull( |
| 128 | self, keys: set[Key], all_hlg_keys: Collection[Key] |
| 129 | ) -> tuple[Layer, Mapping[Key, set[Key]]]: |
| 130 | """Remove unnecessary tasks from the layer |
| 131 | |
| 132 | In other words, return a new Layer with only the tasks required to |
| 133 | calculate `keys` and a map of external key dependencies. |
| 134 | |
| 135 | Examples |
| 136 | -------- |
| 137 | >>> inc = lambda x: x + 1 |
| 138 | >>> add = lambda x, y: x + y |
| 139 | >>> d = MaterializedLayer({'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}) |
| 140 | >>> _, deps = d.cull({'out'}, d.keys()) |
| 141 | >>> deps |
| 142 | {'out': {'x'}, 'x': set()} |
| 143 | |
| 144 | Returns |
| 145 | ------- |
| 146 | layer: Layer |
| 147 | Culled layer |
| 148 | deps: Map |
| 149 | Map of external key dependencies |
| 150 | """ |
| 151 | |
| 152 | if self.has_legacy_tasks: |
| 153 | if len(keys) == len(self): |
| 154 | # Nothing to cull if preserving all existing keys |
| 155 | return ( |
| 156 | self, |
| 157 | {k: self.get_dependencies(k, all_hlg_keys) for k in self.keys()}, |
| 158 | ) |
| 159 | ret_deps = {} |
| 160 | seen = set() |
| 161 | out = {} |
| 162 | work = keys.copy() |
| 163 | while work: |
| 164 | k = work.pop() |
| 165 | if k not in self: |
| 166 | continue |
| 167 | out[k] = self[k] |
| 168 | ret_deps[k] = self.get_dependencies(k, all_hlg_keys) |
| 169 | for d in ret_deps[k]: |
| 170 | if d not in seen: |
| 171 | if d in self: |
| 172 | seen.add(d) |
| 173 | work.add(d) |
| 174 | |
| 175 | return MaterializedLayer(out, annotations=self.annotations), ret_deps |
| 176 | else: |
| 177 | from dask._task_spec import cull |
| 178 | |
| 179 | out = cull(dict(self), keys) |
| 180 | return MaterializedLayer(out, annotations=self.annotations), { |
| 181 | k: set(v.dependencies) for k, v in out.items() |
| 182 | } |
| 183 | |
| 184 | def get_dependencies(self, key: Key, all_hlg_keys: Collection[Key]) -> set: |
no test coverage detected