MCPcopy Index your code
hub / github.com/docker/docker-agent / runStreamLoop

Method runStreamLoop

pkg/runtime/loop.go:236–536  ·  view source on GitHub ↗

runStreamLoop is the body of RunStream. Pulled out of the anonymous goroutine so it has a real name in stack traces and is easier to navigate in editors.

(ctx context.Context, sess *session.Session, events chan Event)

Source from the content-addressed store, hash-verified

234// goroutine so it has a real name in stack traces and is easier to navigate
235// in editors.
236func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, events chan Event) {
237 sink := &channelSink{ch: events}
238
239 // Seed the cagent session ID at the run-loop boundary so any
240 // gateway-bound HTTP call originating from this loop can correlate
241 // back to the originating session. Plumbing happens in
242 // pkg/httpclient/userAgentTransport, gated on `X-Cagent-Forward`.
243 ctx = httpclient.ContextWithSessionID(ctx, sess.ID)
244 r.telemetry.RecordSessionStart(ctx, r.currentAgentName(), sess.ID)
245
246 // Seed `gen_ai.conversation.id` into baggage at the session
247 // boundary. Every span the runtime, providers, MCP client, RAG,
248 // sandbox, evaluation, hooks, and (downstream) any subprocess
249 // or remote service create from here on will pick it up
250 // automatically without per-helper plumbing — and the value
251 // rides over W3C `baggage` so it crosses MCP / sandbox /
252 // HTTP boundaries too.
253 ctx = genai.WithConversationID(ctx, sess.ID)
254
255 // A non-interactive session (background_agents via runCollecting, MCP
256 // serve, A2A, evals) has no live UI that can answer an OAuth elicitation,
257 // and runCollecting drops events rather than forwarding them. If a remote
258 // MCP toolset needs first-time OAuth, blocking on an elicitation that
259 // nobody can answer hangs the sub-agent forever (issue #3200): the
260 // connection context is detached with context.WithoutCancel, so not even
261 // cancellation can unblock it. Mark the context so toolset Start() fails
262 // fast with an AuthorizationRequiredError — the same fast-fail the startup
263 // tool probe uses (see EmitStartupInfo) — instead of eliciting. A real user
264 // authorizes such servers from an interactive turn (or transfer_task, which
265 // keeps NonInteractive=false and forwards the dialog to the TUI).
266 if sess.NonInteractive {
267 ctx = mcptools.WithoutInteractivePrompts(ctx)
268 }
269
270 // runtime.session is the root span for one stream. gen_ai.* keys
271 // are emitted alongside the legacy `agent` / `session.id` keys
272 // so existing dashboards keep matching while spec-aware tooling
273 // can filter by `gen_ai.conversation.id` and
274 // `cagent.agent.name`. Legacy keys drop out under
275 // OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental.
276 sessionAttrs := []attribute.KeyValue{
277 attribute.String(genai.AttrConversationID, sess.ID),
278 attribute.String(genai.AttrAgentNameRuntime, r.currentAgentName()),
279 }
280 if genai.EmitLegacyAttributes() {
281 sessionAttrs = append(sessionAttrs,
282 attribute.String("agent", r.currentAgentName()),
283 attribute.String("session.id", sess.ID),
284 )
285 }
286 ctx, sessionSpan := r.startSpan(ctx, "runtime.session", trace.WithAttributes(sessionAttrs...))
287 defer sessionSpan.End()
288
289 // Swap in this stream's events channel for elicitation and save the
290 // previous one so it can be restored on teardown. This allows nested
291 // RunStream calls to temporarily own elicitation without losing the
292 // parent's channel.
293 prevElicitationCh := r.elicitation.swap(events)

Callers 1

RunStreamMethod · 0.95

Calls 15

currentAgentNameMethod · 0.95
startSpanMethod · 0.95
finalizeEventChannelMethod · 0.95
resolveSessionAgentMethod · 0.95
EmitMethod · 0.95
agentDetailsFromTeamMethod · 0.95
emitAgentWarningsMethod · 0.95
getToolsMethod · 0.95
skillSubSessionToolsMethod · 0.95

Tested by

no test coverage detected