handleStreamError classifies the error returned by tryModelWithFallback and either drives auto-compaction recovery (allowed at most r.maxOverflowCompactions consecutive times) or surfaces a fatal error. Context cancellation is treated as a graceful stop. *overflowCompactions is incremented on retry
( ctx context.Context, sess *session.Session, a *agent.Agent, err error, contextLimit int64, overflowCompactions *int, streamSpan trace.Span, events EventSink, )
| 141 | // verify both the "compaction succeeded → retry" and "compaction |
| 142 | // exhausted → fatal" outcomes without instantiating models. |
| 143 | func (r *LocalRuntime) handleStreamError( |
| 144 | ctx context.Context, |
| 145 | sess *session.Session, |
| 146 | a *agent.Agent, |
| 147 | err error, |
| 148 | contextLimit int64, |
| 149 | overflowCompactions *int, |
| 150 | streamSpan trace.Span, |
| 151 | events EventSink, |
| 152 | ) streamErrorOutcome { |
| 153 | // Treat context cancellation as a graceful stop. |
| 154 | if errors.Is(err, context.Canceled) { |
| 155 | slog.DebugContext(ctx, "Model stream canceled by context", "agent", a.Name(), "session_id", sess.ID) |
| 156 | return streamErrorFatal |
| 157 | } |
| 158 | |
| 159 | // Auto-recovery: if the error is a context overflow and session |
| 160 | // compaction is enabled, compact the conversation and retry the |
| 161 | // request instead of surfacing raw errors. We allow at most |
| 162 | // r.maxOverflowCompactions consecutive attempts to avoid an infinite |
| 163 | // loop when compaction cannot reduce the context enough. |
| 164 | if _, ok := errors.AsType[*modelerrors.ContextOverflowError](err); ok && r.sessionCompaction && *overflowCompactions < r.maxOverflowCompactions { |
| 165 | *overflowCompactions++ |
| 166 | slog.WarnContext(ctx, "Context window overflow detected, attempting auto-compaction", |
| 167 | "agent", a.Name(), |
| 168 | "session_id", sess.ID, |
| 169 | "input_tokens", sess.InputTokens, |
| 170 | "output_tokens", sess.OutputTokens, |
| 171 | "context_limit", contextLimit, |
| 172 | "attempt", *overflowCompactions, |
| 173 | ) |
| 174 | events.Emit(Warning( |
| 175 | "The conversation has exceeded the model's context window. Automatically compacting the conversation history...", |
| 176 | a.Name(), |
| 177 | )) |
| 178 | r.compactWithReason(ctx, sess, "", compactionReasonOverflow, events) |
| 179 | return streamErrorRetry |
| 180 | } |
| 181 | |
| 182 | streamSpan.RecordError(err) |
| 183 | streamSpan.SetStatus(codes.Error, "error handling stream") |
| 184 | slog.ErrorContext(ctx, "All models failed", "agent", a.Name(), "error", err) |
| 185 | r.telemetry.RecordError(ctx, err.Error()) |
| 186 | errMsg := modelerrors.FormatError(err) |
| 187 | events.Emit(ErrorWithCodeForSession(sess.ID, classifyErrorCode(err), errMsg)) |
| 188 | r.notifyError(ctx, a, sess.ID, errMsg) |
| 189 | return streamErrorFatal |
| 190 | } |
| 191 | |
| 192 | // classifyErrorCode maps a model error to an ErrorCode constant for |
| 193 | // structured error events. The classification mirrors [modelerrors] |