MCPcopy Index your code
hub / github.com/docker/docker-agent / handleStream

Function handleStream

pkg/runtime/streaming.go:73–406  ·  view source on GitHub ↗

handleStream reads a chat.MessageStream to completion, emitting streaming events (content deltas, partial tool calls, reasoning tokens) and returning the aggregated streamResult. The caller is responsible for adding the resulting assistant message to the session. cancelStream, when non-nil, is call

(ctx context.Context, cancelStream context.CancelCauseFunc, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events EventSink, idleTimeout time.Duration)

Source from the content-addressed store, hash-verified

71// so the dependency direction is explicit (the loop calls into the chunker,
72// never the reverse).
73func handleStream(ctx context.Context, cancelStream context.CancelCauseFunc, stream chat.MessageStream, a *agent.Agent, agentTools []tools.Tool, sess *session.Session, m *modelsdev.Model, tel Telemetry, events EventSink, idleTimeout time.Duration) (streamResult, error) {
74 // done is closed when handleStream exits (for any reason) so the reader
75 // goroutine below can detect it and stop trying to send on recvCh.
76 done := make(chan struct{})
77 defer close(done)
78 defer stream.Close()
79
80 type recvResult struct {
81 response chat.MessageStreamResponse
82 err error
83 }
84 recvCh := make(chan recvResult, 1)
85
86 // Read chunks in a dedicated goroutine so the main select below can
87 // enforce both context cancellation and the idle timeout without
88 // blocking on a potentially stalled network read.
89 go func() {
90 for {
91 r, err := stream.Recv()
92 select {
93 case recvCh <- recvResult{r, err}:
94 if err != nil {
95 return
96 }
97 case <-done:
98 return
99 case <-ctx.Done():
100 return
101 }
102 }
103 }()
104
105 idleTimer := time.NewTimer(idleTimeout)
106 defer idleTimer.Stop()
107
108 var fullContent strings.Builder
109 var fullReasoningContent strings.Builder
110 var thinkingSignature string
111 var thoughtSignature []byte
112 var toolCalls []tools.ToolCall
113 var messageUsage *chat.Usage
114 var providerFinishReason chat.FinishReason
115
116 toolCallIndex := make(map[string]int) // toolCallID -> index in toolCalls slice
117 emittedPartial := make(map[string]bool) // toolCallID -> whether we've emitted a partial event
118 toolDefMap := make(map[string]tools.Tool, len(agentTools))
119
120 // xmlToolCallGate suppresses AgentChoice events once a <tool_call> tag is
121 // seen, preventing raw XML from rendering in the TUI.
122 xmlToolCallGate := false
123 for _, t := range agentTools {
124 toolDefMap[t.Name] = t
125 }
126
127 // applyXMLFallback extracts <tool_call> blocks from accumulated content when
128 // no structured tool calls were received. Called from both the early-return
129 // and EOF paths.
130 applyXMLFallback := func() {

Calls 15

LenMethod · 0.95
extractXMLToolCallsFunction · 0.85
PartialToolCallFunction · 0.85
AgentChoiceReasoningFunction · 0.85
AgentChoiceFunction · 0.85
SetUsageMethod · 0.80
TotalCostMethod · 0.80
CloseMethod · 0.65
RecvMethod · 0.65
StopMethod · 0.65
NameMethod · 0.65
ResetMethod · 0.65