MCPcopy
hub / github.com/dask/dask / fuse

Method fuse

dask/_task_spec.py:449–496  ·  view source on GitHub ↗

Fuse a set of tasks into a single task. The tasks are fused into a single task that will execute the tasks in a subgraph. The internal tasks are no longer accessible from the outside. All provided tasks must form a valid subgraph that will reduce to a single key. If

(*tasks: GraphNode, key: KeyType | None = None)

Source from the content-addressed store, hash-verified

447
448 @staticmethod
449 def fuse(*tasks: GraphNode, key: KeyType | None = None) -> GraphNode:
450 """Fuse a set of tasks into a single task.
451
452 The tasks are fused into a single task that will execute the tasks in a
453 subgraph. The internal tasks are no longer accessible from the outside.
454
455 All provided tasks must form a valid subgraph that will reduce to a
456 single key. If multiple outputs are possible with the provided tasks, an
457 exception will be raised.
458
459 The tasks will not be rewritten but instead a new Task will be created
460 that will merely reference the old task objects. This way, Task objects
461 may be reused in multiple fused tasks.
462
463 Parameters
464 ----------
465 key : KeyType | None, optional
466 The key of the new Task object. If None provided, the key of the
467 final task will be used.
468
469 See also
470 --------
471 GraphNode.substitute : Easier substitution of dependencies
472 """
473 if any(t.key is None for t in tasks):
474 raise ValueError("Cannot fuse tasks with missing keys")
475 if len(tasks) == 1:
476 return tasks[0].substitute({}, key=key)
477 all_keys = set()
478 all_deps: set[KeyType] = set()
479 for t in tasks:
480 all_deps.update(t.dependencies)
481 all_keys.add(t.key)
482 external_deps = tuple(sorted(all_deps - all_keys, key=hash))
483 leafs = all_keys - all_deps
484 if len(leafs) > 1:
485 raise ValueError(f"Cannot fuse tasks with multiple outputs {leafs}")
486
487 outkey = leafs.pop()
488 return Task(
489 key or outkey,
490 _execute_subgraph,
491 {t.key: t for t in tasks},
492 outkey,
493 external_deps,
494 *(TaskRef(k) for k in external_deps),
495 _data_producer=any(t.data_producer for t in tasks),
496 )
497
498 @classmethod
499 @lru_cache

Callers 1

fuse_linear_task_specFunction · 0.45

Calls 7

anyFunction · 0.85
setClass · 0.85
TaskClass · 0.85
TaskRefClass · 0.85
popMethod · 0.80
substituteMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected