MCPcopy Index your code
hub / github.com/google/adk-python / _run_async_impl

Method _run_async_impl

src/google/adk/agents/langgraph_agent.py:65–101  ·  view source on GitHub ↗
(
      self,
      ctx: InvocationContext,
  )

Source from the content-addressed store, hash-verified

63
64 @override
65 async def _run_async_impl(
66 self,
67 ctx: InvocationContext,
68 ) -> AsyncGenerator[Event, None]:
69
70 # Needed for langgraph checkpointer (for subsequent invocations; multi-turn)
71 config: RunnableConfig = {'configurable': {'thread_id': ctx.session.id}}
72
73 # Add instruction as SystemMessage if graph state is empty
74 current_graph_state = self.graph.get_state(config)
75 graph_messages = (
76 current_graph_state.values.get('messages', [])
77 if current_graph_state.values
78 else []
79 )
80 messages = (
81 [SystemMessage(content=self.instruction)]
82 if self.instruction and not graph_messages
83 else []
84 )
85 # Add events to messages (evaluating the memory used; parent agent vs checkpointer)
86 messages += self._get_messages(ctx.session.events)
87
88 # Use the Runnable
89 final_state = self.graph.invoke({'messages': messages}, config)
90 result = final_state['messages'][-1].content
91
92 result_event = Event(
93 invocation_id=ctx.invocation_id,
94 author=self.name,
95 branch=ctx.branch,
96 content=types.Content(
97 role='model',
98 parts=[types.Part.from_text(text=result)],
99 ),
100 )
101 yield result_event
102
103 def _get_messages(
104 self, events: list[Event]

Callers

nothing calls this directly

Calls 4

_get_messagesMethod · 0.95
EventClass · 0.50
getMethod · 0.45
invokeMethod · 0.45

Tested by

no test coverage detected