MCPcopy Index your code
hub / github.com/google/adk-python / _start_node_task

Method _start_node_task

src/google/adk/workflow/_workflow.py:538–627  ·  view source on GitHub ↗

Create NodeRunner and start asyncio task for a node.

(
      self,
      loop_state: _LoopState,
      ctx: Context,
      node_name: str,
      trigger: Trigger,
  )

Source from the content-addressed store, hash-verified

536 node_state.status = NodeStatus.RUNNING
537
538 def _start_node_task(
539 self,
540 loop_state: _LoopState,
541 ctx: Context,
542 node_name: str,
543 trigger: Trigger,
544 ) -> None:
545 """Create NodeRunner and start asyncio task for a node."""
546
547 assert self.graph is not None
548
549 node = self._get_static_node_by_name(node_name)
550 is_terminal = node_name in self.graph._terminal_node_names
551
552 node_state = loop_state.nodes[node_name]
553 # Reuse run_id on resume; assign a new sequential id for fresh runs.
554 run_id = node_state.run_id
555 if not run_id:
556 run_id = self._next_run_id(node_state)
557 node_state.run_id = run_id
558
559 # Intercept execution based on historical session events.
560 key = f'{node_name}@{run_id}'
561 if key in loop_state.recovered_executions:
562 recovered = loop_state.recovered_executions[key]
563
564 result = check_interception(
565 node_path=f'{ctx.node_path}/{node_name}@{run_id}',
566 node=node,
567 recovered=recovered,
568 curr_parent_ctx=ctx,
569 )
570
571 if not result.should_run:
572 is_terminal = node_name in self.graph._terminal_node_names
573 ancestor_path = ctx.node_path if is_terminal else None
574
575 if ancestor_path:
576 ancestors = [ancestor_path] + list(ctx._output_for_ancestors or [])
577 else:
578 ancestors = list(ctx._output_for_ancestors or [])
579
580 mock_ctx = create_mock_context(
581 parent_ctx=ctx,
582 node=node,
583 run_id=run_id,
584 result=result,
585 ancestors=ancestors,
586 branch=recovered.branch,
587 )
588
589 async def return_ctx():
590 if loop_state.sequence_barrier:
591 await loop_state.sequence_barrier.wait(key)
592 return mock_ctx
593
594 loop_state.pending_tasks[node_name] = asyncio.create_task(return_ctx())
595 return

Callers 1

_schedule_ready_nodesMethod · 0.95

Calls 7

_next_run_idMethod · 0.95
runMethod · 0.95
check_interceptionFunction · 0.85
create_mock_contextFunction · 0.85
NodeRunnerClass · 0.85

Tested by

no test coverage detected