runStreamLoop is the body of RunStream. Pulled out of the anonymous goroutine so it has a real name in stack traces and is easier to navigate in editors.
(ctx context.Context, sess *session.Session, events chan Event)
| 234 | // goroutine so it has a real name in stack traces and is easier to navigate |
| 235 | // in editors. |
| 236 | func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, events chan Event) { |
| 237 | sink := &channelSink{ch: events} |
| 238 | |
| 239 | // Seed the cagent session ID at the run-loop boundary so any |
| 240 | // gateway-bound HTTP call originating from this loop can correlate |
| 241 | // back to the originating session. Plumbing happens in |
| 242 | // pkg/httpclient/userAgentTransport, gated on `X-Cagent-Forward`. |
| 243 | ctx = httpclient.ContextWithSessionID(ctx, sess.ID) |
| 244 | r.telemetry.RecordSessionStart(ctx, r.currentAgentName(), sess.ID) |
| 245 | |
| 246 | // Seed `gen_ai.conversation.id` into baggage at the session |
| 247 | // boundary. Every span the runtime, providers, MCP client, RAG, |
| 248 | // sandbox, evaluation, hooks, and (downstream) any subprocess |
| 249 | // or remote service create from here on will pick it up |
| 250 | // automatically without per-helper plumbing — and the value |
| 251 | // rides over W3C `baggage` so it crosses MCP / sandbox / |
| 252 | // HTTP boundaries too. |
| 253 | ctx = genai.WithConversationID(ctx, sess.ID) |
| 254 | |
| 255 | // A non-interactive session (background_agents via runCollecting, MCP |
| 256 | // serve, A2A, evals) has no live UI that can answer an OAuth elicitation, |
| 257 | // and runCollecting drops events rather than forwarding them. If a remote |
| 258 | // MCP toolset needs first-time OAuth, blocking on an elicitation that |
| 259 | // nobody can answer hangs the sub-agent forever (issue #3200): the |
| 260 | // connection context is detached with context.WithoutCancel, so not even |
| 261 | // cancellation can unblock it. Mark the context so toolset Start() fails |
| 262 | // fast with an AuthorizationRequiredError — the same fast-fail the startup |
| 263 | // tool probe uses (see EmitStartupInfo) — instead of eliciting. A real user |
| 264 | // authorizes such servers from an interactive turn (or transfer_task, which |
| 265 | // keeps NonInteractive=false and forwards the dialog to the TUI). |
| 266 | if sess.NonInteractive { |
| 267 | ctx = mcptools.WithoutInteractivePrompts(ctx) |
| 268 | } |
| 269 | |
| 270 | // runtime.session is the root span for one stream. gen_ai.* keys |
| 271 | // are emitted alongside the legacy `agent` / `session.id` keys |
| 272 | // so existing dashboards keep matching while spec-aware tooling |
| 273 | // can filter by `gen_ai.conversation.id` and |
| 274 | // `cagent.agent.name`. Legacy keys drop out under |
| 275 | // OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental. |
| 276 | sessionAttrs := []attribute.KeyValue{ |
| 277 | attribute.String(genai.AttrConversationID, sess.ID), |
| 278 | attribute.String(genai.AttrAgentNameRuntime, r.currentAgentName()), |
| 279 | } |
| 280 | if genai.EmitLegacyAttributes() { |
| 281 | sessionAttrs = append(sessionAttrs, |
| 282 | attribute.String("agent", r.currentAgentName()), |
| 283 | attribute.String("session.id", sess.ID), |
| 284 | ) |
| 285 | } |
| 286 | ctx, sessionSpan := r.startSpan(ctx, "runtime.session", trace.WithAttributes(sessionAttrs...)) |
| 287 | defer sessionSpan.End() |
| 288 | |
| 289 | // Swap in this stream's events channel for elicitation and save the |
| 290 | // previous one so it can be restored on teardown. This allows nested |
| 291 | // RunStream calls to temporarily own elicitation without losing the |
| 292 | // parent's channel. |
| 293 | prevElicitationCh := r.elicitation.swap(events) |
no test coverage detected