Return new dask with only the tasks required to calculate keys. In other words, remove unnecessary tasks from dask. ``keys`` may be a single key or list of keys. Examples -------- >>> def inc(x): ... return x + 1 >>> def add(x, y): ... return x + y >>>
(dsk, keys)
| 18 | |
| 19 | |
| 20 | def cull(dsk, keys): |
| 21 | """Return new dask with only the tasks required to calculate keys. |
| 22 | |
| 23 | In other words, remove unnecessary tasks from dask. |
| 24 | ``keys`` may be a single key or list of keys. |
| 25 | |
| 26 | Examples |
| 27 | -------- |
| 28 | >>> def inc(x): |
| 29 | ... return x + 1 |
| 30 | |
| 31 | >>> def add(x, y): |
| 32 | ... return x + y |
| 33 | |
| 34 | >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)} |
| 35 | >>> dsk, dependencies = cull(d, 'out') |
| 36 | >>> dsk # doctest: +ELLIPSIS |
| 37 | {'out': (<function add at ...>, 'x', 10), 'x': 1} |
| 38 | >>> dependencies # doctest: +ELLIPSIS |
| 39 | {'out': ['x'], 'x': []} |
| 40 | |
| 41 | Returns |
| 42 | ------- |
| 43 | dsk: culled dask graph |
| 44 | dependencies: Dict mapping {key: [deps]}. Useful side effect to accelerate |
| 45 | other optimizations, notably fuse. |
| 46 | """ |
| 47 | if not isinstance(keys, (list, set)): |
| 48 | keys = [keys] |
| 49 | |
| 50 | seen = set() |
| 51 | dependencies = dict() |
| 52 | out = {} |
| 53 | work = list(set(flatten(keys))) |
| 54 | |
| 55 | while work: |
| 56 | new_work = [] |
| 57 | for k in work: |
| 58 | dependencies_k = get_dependencies(dsk, k, as_list=True) # fuse needs lists |
| 59 | out[k] = dsk[k] |
| 60 | dependencies[k] = dependencies_k |
| 61 | for d in dependencies_k: |
| 62 | if d not in seen: |
| 63 | seen.add(d) |
| 64 | new_work.append(d) |
| 65 | |
| 66 | work = new_work |
| 67 | |
| 68 | return out, dependencies |
| 69 | |
| 70 | |
| 71 | def default_fused_linear_keys_renamer(keys): |
searching dependent graphs…