handleStream reads a chat.MessageStream to completion, emitting streaming events (content deltas, partial tool calls, reasoning tokens) and returning the aggregated streamResult. The caller is responsible for adding the resulting assistant message to the session. cancelStream, when non-nil, is call
(ctx context.Context, cancelStream context.CancelCauseFunc, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events EventSink, idleTimeout time.Duration)
| 71 | // so the dependency direction is explicit (the loop calls into the chunker, |
| 72 | // never the reverse). |
| 73 | func handleStream(ctx context.Context, cancelStream context.CancelCauseFunc, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events EventSink, idleTimeout time.Duration) (streamResult, error) { |
| 74 | // done is closed when handleStream exits (for any reason) so the reader |
| 75 | // goroutine below can detect it and stop trying to send on recvCh. |
| 76 | done := make(chan struct{}) |
| 77 | defer close(done) |
| 78 | defer stream.Close() |
| 79 | |
| 80 | type recvResult struct { |
| 81 | response chat.MessageStreamResponse |
| 82 | err error |
| 83 | } |
| 84 | recvCh := make(chan recvResult, 1) |
| 85 | |
| 86 | // Read chunks in a dedicated goroutine so the main select below can |
| 87 | // enforce both context cancellation and the idle timeout without |
| 88 | // blocking on a potentially stalled network read. |
| 89 | go func() { |
| 90 | for { |
| 91 | r, err := stream.Recv() |
| 92 | select { |
| 93 | case recvCh <- recvResult{r, err}: |
| 94 | if err != nil { |
| 95 | return |
| 96 | } |
| 97 | case <-done: |
| 98 | return |
| 99 | case <-ctx.Done(): |
| 100 | return |
| 101 | } |
| 102 | } |
| 103 | }() |
| 104 | |
| 105 | idleTimer := time.NewTimer(idleTimeout) |
| 106 | defer idleTimer.Stop() |
| 107 | |
| 108 | var fullContent strings.Builder |
| 109 | var fullReasoningContent strings.Builder |
| 110 | var thinkingSignature string |
| 111 | var thoughtSignature []byte |
| 112 | var toolCalls []tools.ToolCall |
| 113 | var messageUsage *chat.Usage |
| 114 | var providerFinishReason chat.FinishReason |
| 115 | |
| 116 | toolCallIndex := make(map[string]int) // toolCallID -> index in toolCalls slice |
| 117 | emittedPartial := make(map[string]bool) // toolCallID -> whether we've emitted a partial event |
| 118 | toolDefMap := make(map[string]tools.Tool, len(agentTools)) |
| 119 | |
| 120 | // xmlToolCallGate suppresses AgentChoice events once a <tool_call> tag is |
| 121 | // seen, preventing raw XML from rendering in the TUI. |
| 122 | xmlToolCallGate := false |
| 123 | for _, t := range agentTools { |
| 124 | toolDefMap[t.Name] = t |
| 125 | } |
| 126 | |
| 127 | // applyXMLFallback extracts <tool_call> blocks from accumulated content when |
| 128 | // no structured tool calls were received. Called from both the early-return |
| 129 | // and EOF paths. |
| 130 | applyXMLFallback := func() { |