MCPcopy Index your code
hub / github.com/google/adk-python / run

Method run

src/google/adk/workflow/_node_runner.py:112–170  ·  view source on GitHub ↗

Drive node.run(), enqueue events, return child Context. The caller reads ctx.output, ctx.route, and ctx.interrupt_ids for the node's results.

(
      self,
      node_input: Any = None,
      *,
      resume_inputs: dict[str, Any] | None = None,
  )

Source from the content-addressed store, hash-verified

110 return self._run_id
111
112 async def run(
113 self,
114 node_input: Any = None,
115 *,
116 resume_inputs: dict[str, Any] | None = None,
117 ) -> Context:
118 """Drive node.run(), enqueue events, return child Context.
119
120 The caller reads ctx.output, ctx.route, and ctx.interrupt_ids
121 for the node's results.
122 """
123 attempt_count = 1
124 while True:
125 ctx = self._create_child_context(
126 resume_inputs, attempt_count=attempt_count
127 )
128 logger.debug("node %s started.", ctx.node_path)
129 try:
130 # Start the span within try-except block to record exceptions on the span
131 async with node_tracing.start_as_current_node_span(
132 self._parent_ctx, self._node
133 ) as telemetry_context:
134 ctx._telemetry_context = telemetry_context
135 await self._execute_node(ctx, node_input)
136 await self._flush_output_and_deltas(ctx)
137 logger.debug("node %s end.", ctx.node_path)
138 return ctx
139 except Exception as e:
140 from ._errors import DynamicNodeFailError
141
142 if isinstance(e, DynamicNodeFailError):
143 # TODO: consider to retry upon dynamic node failures later. This may
144 # require thorough design to consider a workflow dynamic node and a
145 # normal node.
146 ctx._error = e.error
147 ctx._error_node_path = e.error_node_path
148 logger.debug("node %s end.", ctx.node_path)
149 return ctx
150
151 from ..events.event import Event
152
153 logger.exception("Node execution failed with exception")
154 error_event = Event(
155 error_code=type(e).__name__,
156 error_message=str(e),
157 )
158 await self._enqueue_event(error_event, ctx)
159
160 if not await self._attempt_retry(e, ctx, attempt_count):
161 ctx._error = e
162 ctx._error_node_path = ctx.node_path
163 logger.debug("node %s end.", ctx.node_path)
164 return ctx
165 logger.warning(
166 "Node %s failed and is being retried locally. Note: retry count is"
167 " not persisted across resuming.",
168 self._node.name,
169 )

Callers 6

_run_node_standaloneMethod · 0.95
_start_node_taskMethod · 0.95
_run_node_loopMethod · 0.45

Calls 7

_create_child_contextMethod · 0.95
_execute_nodeMethod · 0.95
_enqueue_eventMethod · 0.95
_attempt_retryMethod · 0.95
typeFunction · 0.85
EventClass · 0.50