MCPcopy
hub / github.com/docker/docker-agent / handleStreamError

Method handleStreamError

pkg/runtime/loop_steps.go:143–190  ·  view source on GitHub ↗

handleStreamError classifies the error returned by tryModelWithFallback and either drives auto-compaction recovery (allowed at most r.maxOverflowCompactions consecutive times) or surfaces a fatal error. Context cancellation is treated as a graceful stop. *overflowCompactions is incremented on retry

(
	ctx context.Context,
	sess *session.Session,
	a *agent.Agent,
	err error,
	contextLimit int64,
	overflowCompactions *int,
	streamSpan trace.Span,
	events EventSink,
)

Source from the content-addressed store, hash-verified

141// verify both the "compaction succeeded → retry" and "compaction
142// exhausted → fatal" outcomes without instantiating models.
143func (r *LocalRuntime) handleStreamError(
144 ctx context.Context,
145 sess *session.Session,
146 a *agent.Agent,
147 err error,
148 contextLimit int64,
149 overflowCompactions *int,
150 streamSpan trace.Span,
151 events EventSink,
152) streamErrorOutcome {
153 // Treat context cancellation as a graceful stop.
154 if errors.Is(err, context.Canceled) {
155 slog.DebugContext(ctx, "Model stream canceled by context", "agent", a.Name(), "session_id", sess.ID)
156 return streamErrorFatal
157 }
158
159 // Auto-recovery: if the error is a context overflow and session
160 // compaction is enabled, compact the conversation and retry the
161 // request instead of surfacing raw errors. We allow at most
162 // r.maxOverflowCompactions consecutive attempts to avoid an infinite
163 // loop when compaction cannot reduce the context enough.
164 if _, ok := errors.AsType[*modelerrors.ContextOverflowError](err); ok && r.sessionCompaction && *overflowCompactions < r.maxOverflowCompactions {
165 *overflowCompactions++
166 slog.WarnContext(ctx, "Context window overflow detected, attempting auto-compaction",
167 "agent", a.Name(),
168 "session_id", sess.ID,
169 "input_tokens", sess.InputTokens,
170 "output_tokens", sess.OutputTokens,
171 "context_limit", contextLimit,
172 "attempt", *overflowCompactions,
173 )
174 events.Emit(Warning(
175 "The conversation has exceeded the model's context window. Automatically compacting the conversation history...",
176 a.Name(),
177 ))
178 r.compactWithReason(ctx, sess, "", compactionReasonOverflow, events)
179 return streamErrorRetry
180 }
181
182 streamSpan.RecordError(err)
183 streamSpan.SetStatus(codes.Error, "error handling stream")
184 slog.ErrorContext(ctx, "All models failed", "agent", a.Name(), "error", err)
185 r.telemetry.RecordError(ctx, err.Error())
186 errMsg := modelerrors.FormatError(err)
187 events.Emit(ErrorWithCodeForSession(sess.ID, classifyErrorCode(err), errMsg))
188 r.notifyError(ctx, a, sess.ID, errMsg)
189 return streamErrorFatal
190}
191
192// classifyErrorCode maps a model error to an ErrorCode constant for
193// structured error events. The classification mirrors [modelerrors]

Calls 10

compactWithReasonMethod · 0.95
notifyErrorMethod · 0.95
FormatErrorFunction · 0.92
WarningFunction · 0.85
ErrorWithCodeForSessionFunction · 0.85
classifyErrorCodeFunction · 0.85
NameMethod · 0.65
EmitMethod · 0.65
RecordErrorMethod · 0.65
ErrorMethod · 0.45