Consume the full agent stream and update *state* with results. Args: agent: The agent (Pregel or RemoteAgent). stream_input: Either the initial user message dict or a `Command(resume=...)` for HITL continuation. config: LangGraph runnable config (thread ID, m
(
agent: Any, # noqa: ANN401
stream_input: dict[str, Any] | Command,
config: RunnableConfig,
state: StreamState,
console: Console,
file_op_tracker: FileOpTracker,
context: CLIContext,
)
| 736 | |
| 737 | |
| 738 | async def _stream_agent( |
| 739 | agent: Any, # noqa: ANN401 |
| 740 | stream_input: dict[str, Any] | Command, |
| 741 | config: RunnableConfig, |
| 742 | state: StreamState, |
| 743 | console: Console, |
| 744 | file_op_tracker: FileOpTracker, |
| 745 | context: CLIContext, |
| 746 | ) -> None: |
| 747 | """Consume the full agent stream and update *state* with results. |
| 748 | |
| 749 | Args: |
| 750 | agent: The agent (Pregel or RemoteAgent). |
| 751 | stream_input: Either the initial user message dict or a |
| 752 | `Command(resume=...)` for HITL continuation. |
| 753 | config: LangGraph runnable config (thread ID, metadata, etc.). |
| 754 | state: Shared stream state. |
| 755 | console: Rich console for formatted output. |
| 756 | file_op_tracker: Tracker for file-operation diffs. |
| 757 | context: Runtime context for model-call middleware. |
| 758 | """ |
| 759 | if state.spinner: |
| 760 | state.spinner.start() |
| 761 | try: |
| 762 | async for chunk in agent.astream( |
| 763 | stream_input, |
| 764 | stream_mode=["messages", "updates", "custom"], |
| 765 | subgraphs=True, |
| 766 | config=config, |
| 767 | context=context, |
| 768 | durability="exit", |
| 769 | ): |
| 770 | _process_stream_chunk(chunk, state, console, file_op_tracker) |
| 771 | finally: |
| 772 | if state.spinner: |
| 773 | state.spinner.stop() |
| 774 | |
| 775 | |
| 776 | async def _run_agent_loop( |
no test coverage detected
searching dependent graphs…