Executes a node dynamically. This method allows a node within a workflow to trigger the run of another node (or a callable that can be built into a node) and asynchronously wait for its result. The dynamically executed node becomes a child run of the current node in the workflow.
(
self,
node: NodeLike,
node_input: Any = None,
*,
use_as_output: bool = False,
run_id: str | None = None,
use_sub_branch: bool = False,
override_branch: str | None = None,
override_isolation_scope: str | None = None,
raise_on_wait: bool = False,
)
| 409 | return ctx_with_proxy |
| 410 | |
| 411 | async def run_node( |
| 412 | self, |
| 413 | node: NodeLike, |
| 414 | node_input: Any = None, |
| 415 | *, |
| 416 | use_as_output: bool = False, |
| 417 | run_id: str | None = None, |
| 418 | use_sub_branch: bool = False, |
| 419 | override_branch: str | None = None, |
| 420 | override_isolation_scope: str | None = None, |
| 421 | raise_on_wait: bool = False, |
| 422 | ) -> Any: |
| 423 | """Executes a node dynamically. |
| 424 | |
| 425 | This method allows a node within a workflow to trigger the run of |
| 426 | another node (or a callable that can be built into a node) and |
| 427 | asynchronously wait for its result. The dynamically executed node becomes |
| 428 | a child run of the current node in the workflow. |
| 429 | |
| 430 | IMPORTANT: Always ``await`` this method directly. Wrapping it in |
| 431 | ``asyncio.create_task()`` means the task runs unsupervised — errors |
| 432 | are silently swallowed and the task is not cancelled if the parent |
| 433 | node is interrupted (e.g. via HITL). |
| 434 | |
| 435 | Args: |
| 436 | node: The node to be executed. This can be a BaseNode instance or a |
| 437 | callable that can be built into a node. |
| 438 | node_input: The input data to be passed to the dynamically executed node. |
| 439 | Defaults to None. |
| 440 | use_as_output: If True, the dynamic node's output is used as the |
| 441 | calling node's output. The calling node's own output event is |
| 442 | suppressed to avoid duplication. |
| 443 | run_id: An optional custom run ID for the dynamic node execution. |
| 444 | If not provided, a default run ID is generated. Useful for |
| 445 | correlating events across runs. |
| 446 | use_sub_branch: If True, the dynamic node will be executed in a sub-branch |
| 447 | to isolate its state and events from the main branch. |
| 448 | override_branch: An optional branch to use instead of parent's branch. |
| 449 | |
| 450 | Returns: |
| 451 | The output of the dynamically executed node, once it finishes executing. |
| 452 | """ |
| 453 | |
| 454 | if not self._node_rerun_on_resume: |
| 455 | raise ValueError( |
| 456 | 'A node must have rerun_on_resume=True. Reason is that dynamically' |
| 457 | ' scheduled nodes might be interrupted, and the workflow' |
| 458 | ' wakes-up/re-runs the parent node, so it can get the child node' |
| 459 | ' response.' |
| 460 | ) |
| 461 | |
| 462 | from ..workflow.utils._workflow_graph_utils import build_node # pylint: disable=g-import-not-at-top |
| 463 | |
| 464 | built_node = build_node(node) |
| 465 | |
| 466 | from ..agents.base_agent import BaseAgent |
| 467 | |
| 468 | if isinstance(node, BaseAgent) and isinstance(built_node, BaseAgent): |