(
self,
ctx: InvocationContext,
)
| 63 | |
| 64 | @override |
| 65 | async def _run_async_impl( |
| 66 | self, |
| 67 | ctx: InvocationContext, |
| 68 | ) -> AsyncGenerator[Event, None]: |
| 69 | |
| 70 | # Needed for langgraph checkpointer (for subsequent invocations; multi-turn) |
| 71 | config: RunnableConfig = {'configurable': {'thread_id': ctx.session.id}} |
| 72 | |
| 73 | # Add instruction as SystemMessage if graph state is empty |
| 74 | current_graph_state = self.graph.get_state(config) |
| 75 | graph_messages = ( |
| 76 | current_graph_state.values.get('messages', []) |
| 77 | if current_graph_state.values |
| 78 | else [] |
| 79 | ) |
| 80 | messages = ( |
| 81 | [SystemMessage(content=self.instruction)] |
| 82 | if self.instruction and not graph_messages |
| 83 | else [] |
| 84 | ) |
| 85 | # Add events to messages (evaluating the memory used; parent agent vs checkpointer) |
| 86 | messages += self._get_messages(ctx.session.events) |
| 87 | |
| 88 | # Use the Runnable |
| 89 | final_state = self.graph.invoke({'messages': messages}, config) |
| 90 | result = final_state['messages'][-1].content |
| 91 | |
| 92 | result_event = Event( |
| 93 | invocation_id=ctx.invocation_id, |
| 94 | author=self.name, |
| 95 | branch=ctx.branch, |
| 96 | content=types.Content( |
| 97 | role='model', |
| 98 | parts=[types.Part.from_text(text=result)], |
| 99 | ), |
| 100 | ) |
| 101 | yield result_event |
| 102 | |
| 103 | def _get_messages( |
| 104 | self, events: list[Event] |
nothing calls this directly
no test coverage detected