RunStream starts the agent's interaction loop and returns a channel of events
(ctx context.Context, sess *session.Session)
| 257 | |
| 258 | // RunStream starts the agent's interaction loop and returns a channel of events |
| 259 | func (r *RemoteRuntime) RunStream(ctx context.Context, sess *session.Session) <-chan Event { |
| 260 | slog.DebugContext(ctx, "Starting remote runtime stream", "agent", r.currentAgent, "session_id", r.sessionID) |
| 261 | events := make(chan Event, defaultEventChannelCapacity) |
| 262 | |
| 263 | go func() { |
| 264 | defer close(events) |
| 265 | |
| 266 | messages := r.convertSessionMessages(sess) |
| 267 | r.sessionID = sess.ID |
| 268 | |
| 269 | // Snapshot the queued override but do NOT clear it yet: if the |
| 270 | // request fails before the server can persist it, clearing here |
| 271 | // would silently drop the user's switch. We only clear after the |
| 272 | // server has accepted the request (i.e. RunAgent returned a stream). |
| 273 | r.pendingMu.Lock() |
| 274 | model := r.pendingModelOverride |
| 275 | r.pendingMu.Unlock() |
| 276 | |
| 277 | var streamChan <-chan Event |
| 278 | var err error |
| 279 | |
| 280 | if r.currentAgent != "" { |
| 281 | streamChan, err = r.client.RunAgentWithAgentName(ctx, r.sessionID, r.agentFilename, r.currentAgent, messages, model) |
| 282 | } else { |
| 283 | streamChan, err = r.client.RunAgent(ctx, r.sessionID, r.agentFilename, messages, model) |
| 284 | } |
| 285 | |
| 286 | if err != nil { |
| 287 | events <- Error(fmt.Sprintf("failed to start remote agent: %v", err)) |
| 288 | return |
| 289 | } |
| 290 | |
| 291 | // Server accepted the request, so the override (if any) has been |
| 292 | // forwarded; clear it but only if no concurrent SetAgentModel |
| 293 | // queued a newer ref while we were dispatching. |
| 294 | if model != "" { |
| 295 | r.pendingMu.Lock() |
| 296 | if r.pendingModelOverride == model { |
| 297 | r.pendingModelOverride = "" |
| 298 | } |
| 299 | r.pendingMu.Unlock() |
| 300 | } |
| 301 | |
| 302 | // Consume events from the agent stream |
| 303 | for streamEvent := range streamChan { |
| 304 | if elicitationRequest, ok := streamEvent.(*ElicitationRequestEvent); ok { |
| 305 | r.pendingOAuthElicitation = elicitationRequest |
| 306 | } |
| 307 | events <- streamEvent |
| 308 | } |
| 309 | }() |
| 310 | |
| 311 | return events |
| 312 | } |
| 313 | |
| 314 | // Run starts the agent's interaction loop and returns the final messages |
| 315 | func (r *RemoteRuntime) Run(ctx context.Context, sess *session.Session) ([]session.Message, error) { |