Execute a given graph. The graph is executed in topological order as defined by dask.order until all leaf nodes, i.e. nodes without any dependents, are reached. The returned dictionary contains the results of the leaf nodes. If keys are required that are not part of the graph, they
(
dsk: Iterable[GraphNode] | Mapping[KeyType, GraphNode],
cache: MutableMapping[KeyType, object] | None = None,
keys: Container[KeyType] | None = None,
)
| 1062 | |
| 1063 | |
| 1064 | def execute_graph( |
| 1065 | dsk: Iterable[GraphNode] | Mapping[KeyType, GraphNode], |
| 1066 | cache: MutableMapping[KeyType, object] | None = None, |
| 1067 | keys: Container[KeyType] | None = None, |
| 1068 | ) -> MutableMapping[KeyType, object]: |
| 1069 | """Execute a given graph. |
| 1070 | |
| 1071 | The graph is executed in topological order as defined by dask.order until |
| 1072 | all leaf nodes, i.e. nodes without any dependents, are reached. The returned |
| 1073 | dictionary contains the results of the leaf nodes. |
| 1074 | |
| 1075 | If keys are required that are not part of the graph, they can be provided in the `cache` argument. |
| 1076 | |
| 1077 | If `keys` is provided, the result will contain only values that are part of the `keys` set. |
| 1078 | |
| 1079 | """ |
| 1080 | if isinstance(dsk, (list, tuple, set, frozenset)): |
| 1081 | dsk = {t.key: t for t in dsk} |
| 1082 | else: |
| 1083 | assert isinstance(dsk, dict) |
| 1084 | |
| 1085 | refcount: defaultdict[KeyType, int] = defaultdict(int) |
| 1086 | for vals in DependenciesMapping(dsk).values(): |
| 1087 | for val in vals: |
| 1088 | refcount[val] += 1 |
| 1089 | |
| 1090 | cache = cache or {} |
| 1091 | from dask.order import order |
| 1092 | |
| 1093 | priorities = order(dsk) |
| 1094 | |
| 1095 | for key, node in sorted(dsk.items(), key=lambda it: priorities[it[0]]): |
| 1096 | cache[key] = node(cache) |
| 1097 | for dep in node.dependencies: |
| 1098 | refcount[dep] -= 1 |
| 1099 | if refcount[dep] == 0 and keys and dep not in keys: |
| 1100 | del cache[dep] |
| 1101 | |
| 1102 | return cache |
| 1103 | |
| 1104 | |
| 1105 | def fuse_linear_task_spec(dsk, keys): |
searching dependent graphs…