Drive node.run(), enqueue events, return child Context. The caller reads ctx.output, ctx.route, and ctx.interrupt_ids for the node's results.
(
self,
node_input: Any = None,
*,
resume_inputs: dict[str, Any] | None = None,
)
| 110 | return self._run_id |
| 111 | |
| 112 | async def run( |
| 113 | self, |
| 114 | node_input: Any = None, |
| 115 | *, |
| 116 | resume_inputs: dict[str, Any] | None = None, |
| 117 | ) -> Context: |
| 118 | """Drive node.run(), enqueue events, return child Context. |
| 119 | |
| 120 | The caller reads ctx.output, ctx.route, and ctx.interrupt_ids |
| 121 | for the node's results. |
| 122 | """ |
| 123 | attempt_count = 1 |
| 124 | while True: |
| 125 | ctx = self._create_child_context( |
| 126 | resume_inputs, attempt_count=attempt_count |
| 127 | ) |
| 128 | logger.debug("node %s started.", ctx.node_path) |
| 129 | try: |
| 130 | # Start the span within try-except block to record exceptions on the span |
| 131 | async with node_tracing.start_as_current_node_span( |
| 132 | self._parent_ctx, self._node |
| 133 | ) as telemetry_context: |
| 134 | ctx._telemetry_context = telemetry_context |
| 135 | await self._execute_node(ctx, node_input) |
| 136 | await self._flush_output_and_deltas(ctx) |
| 137 | logger.debug("node %s end.", ctx.node_path) |
| 138 | return ctx |
| 139 | except Exception as e: |
| 140 | from ._errors import DynamicNodeFailError |
| 141 | |
| 142 | if isinstance(e, DynamicNodeFailError): |
| 143 | # TODO: consider to retry upon dynamic node failures later. This may |
| 144 | # require thorough design to consider a workflow dynamic node and a |
| 145 | # normal node. |
| 146 | ctx._error = e.error |
| 147 | ctx._error_node_path = e.error_node_path |
| 148 | logger.debug("node %s end.", ctx.node_path) |
| 149 | return ctx |
| 150 | |
| 151 | from ..events.event import Event |
| 152 | |
| 153 | logger.exception("Node execution failed with exception") |
| 154 | error_event = Event( |
| 155 | error_code=type(e).__name__, |
| 156 | error_message=str(e), |
| 157 | ) |
| 158 | await self._enqueue_event(error_event, ctx) |
| 159 | |
| 160 | if not await self._attempt_retry(e, ctx, attempt_count): |
| 161 | ctx._error = e |
| 162 | ctx._error_node_path = ctx.node_path |
| 163 | logger.debug("node %s end.", ctx.node_path) |
| 164 | return ctx |
| 165 | logger.warning( |
| 166 | "Node %s failed and is being retried locally. Note: retry count is" |
| 167 | " not persisted across resuming.", |
| 168 | self._node.name, |
| 169 | ) |