Fuse a set of tasks into a single task. The tasks are fused into a single task that will execute the tasks in a subgraph. The internal tasks are no longer accessible from the outside. All provided tasks must form a valid subgraph that will reduce to a single key. If
(*tasks: GraphNode, key: KeyType | None = None)
| 447 | |
| 448 | @staticmethod |
| 449 | def fuse(*tasks: GraphNode, key: KeyType | None = None) -> GraphNode: |
| 450 | """Fuse a set of tasks into a single task. |
| 451 | |
| 452 | The tasks are fused into a single task that will execute the tasks in a |
| 453 | subgraph. The internal tasks are no longer accessible from the outside. |
| 454 | |
| 455 | All provided tasks must form a valid subgraph that will reduce to a |
| 456 | single key. If multiple outputs are possible with the provided tasks, an |
| 457 | exception will be raised. |
| 458 | |
| 459 | The tasks will not be rewritten but instead a new Task will be created |
| 460 | that will merely reference the old task objects. This way, Task objects |
| 461 | may be reused in multiple fused tasks. |
| 462 | |
| 463 | Parameters |
| 464 | ---------- |
| 465 | key : KeyType | None, optional |
| 466 | The key of the new Task object. If None provided, the key of the |
| 467 | final task will be used. |
| 468 | |
| 469 | See also |
| 470 | -------- |
| 471 | GraphNode.substitute : Easier substitution of dependencies |
| 472 | """ |
| 473 | if any(t.key is None for t in tasks): |
| 474 | raise ValueError("Cannot fuse tasks with missing keys") |
| 475 | if len(tasks) == 1: |
| 476 | return tasks[0].substitute({}, key=key) |
| 477 | all_keys = set() |
| 478 | all_deps: set[KeyType] = set() |
| 479 | for t in tasks: |
| 480 | all_deps.update(t.dependencies) |
| 481 | all_keys.add(t.key) |
| 482 | external_deps = tuple(sorted(all_deps - all_keys, key=hash)) |
| 483 | leafs = all_keys - all_deps |
| 484 | if len(leafs) > 1: |
| 485 | raise ValueError(f"Cannot fuse tasks with multiple outputs {leafs}") |
| 486 | |
| 487 | outkey = leafs.pop() |
| 488 | return Task( |
| 489 | key or outkey, |
| 490 | _execute_subgraph, |
| 491 | {t.key: t for t in tasks}, |
| 492 | outkey, |
| 493 | external_deps, |
| 494 | *(TaskRef(k) for k in external_deps), |
| 495 | _data_producer=any(t.data_producer for t in tasks), |
| 496 | ) |
| 497 | |
| 498 | @classmethod |
| 499 | @lru_cache |
no test coverage detected