MCPcopy Index your code
hub / github.com/docker/docker-agent / RunStream

Method RunStream

pkg/runtime/remote_runtime.go:259–312  ·  view source on GitHub ↗

RunStream starts the agent's interaction loop and returns a channel of events

(ctx context.Context, sess *session.Session)

Source from the content-addressed store, hash-verified

257
258// RunStream starts the agent's interaction loop and returns a channel of events
259func (r *RemoteRuntime) RunStream(ctx context.Context, sess *session.Session) <-chan Event {
260 slog.DebugContext(ctx, "Starting remote runtime stream", "agent", r.currentAgent, "session_id", r.sessionID)
261 events := make(chan Event, defaultEventChannelCapacity)
262
263 go func() {
264 defer close(events)
265
266 messages := r.convertSessionMessages(sess)
267 r.sessionID = sess.ID
268
269 // Snapshot the queued override but do NOT clear it yet: if the
270 // request fails before the server can persist it, clearing here
271 // would silently drop the user's switch. We only clear after the
272 // server has accepted the request (i.e. RunAgent returned a stream).
273 r.pendingMu.Lock()
274 model := r.pendingModelOverride
275 r.pendingMu.Unlock()
276
277 var streamChan <-chan Event
278 var err error
279
280 if r.currentAgent != "" {
281 streamChan, err = r.client.RunAgentWithAgentName(ctx, r.sessionID, r.agentFilename, r.currentAgent, messages, model)
282 } else {
283 streamChan, err = r.client.RunAgent(ctx, r.sessionID, r.agentFilename, messages, model)
284 }
285
286 if err != nil {
287 events <- Error(fmt.Sprintf("failed to start remote agent: %v", err))
288 return
289 }
290
291 // Server accepted the request, so the override (if any) has been
292 // forwarded; clear it but only if no concurrent SetAgentModel
293 // queued a newer ref while we were dispatching.
294 if model != "" {
295 r.pendingMu.Lock()
296 if r.pendingModelOverride == model {
297 r.pendingModelOverride = ""
298 }
299 r.pendingMu.Unlock()
300 }
301
302 // Consume events from the agent stream
303 for streamEvent := range streamChan {
304 if elicitationRequest, ok := streamEvent.(*ElicitationRequestEvent); ok {
305 r.pendingOAuthElicitation = elicitationRequest
306 }
307 events <- streamEvent
308 }
309 }()
310
311 return events
312}
313
314// Run starts the agent's interaction loop and returns the final messages
315func (r *RemoteRuntime) Run(ctx context.Context, sess *session.Session) ([]session.Message, error) {

Calls 6

ErrorFunction · 0.70
RunAgentWithAgentNameMethod · 0.65
RunAgentMethod · 0.65
LockMethod · 0.45
UnlockMethod · 0.45