PollUntilTerminal polls GetTask with exponential backoff until the task reaches a terminal state, the context is canceled, or an error occurs. Newly discovered artifacts and status changes are emitted to the client. client and agentID identify the SDK client and the bridge's registered agent ID for
( ctx context.Context, client *a2aclient.Client, agentID string, conversationID string, toolCallID string, taskID a2a.TaskID, lastSeen *a2a.Task, alreadyEmitted []string, o agent.OutputHandler, )
| 242 | // the caller. It prevents re-emission across the streaming<->polling handoff |
| 243 | // and across invocation boundaries. |
| 244 | func PollUntilTerminal( |
| 245 | ctx context.Context, |
| 246 | client *a2aclient.Client, |
| 247 | agentID string, |
| 248 | conversationID string, |
| 249 | toolCallID string, |
| 250 | taskID a2a.TaskID, |
| 251 | lastSeen *a2a.Task, |
| 252 | alreadyEmitted []string, |
| 253 | o agent.OutputHandler, |
| 254 | ) error { |
| 255 | emittedArtifactIDs := make(map[a2a.ArtifactID]bool) |
| 256 | if lastSeen != nil { |
| 257 | for _, art := range lastSeen.Artifacts { |
| 258 | emittedArtifactIDs[art.ID] = true |
| 259 | } |
| 260 | } |
| 261 | for _, id := range alreadyEmitted { |
| 262 | emittedArtifactIDs[a2a.ArtifactID(id)] = true |
| 263 | } |
| 264 | |
| 265 | const initialDelay = 250 * time.Millisecond |
| 266 | const maxDelay = 60 * time.Second |
| 267 | |
| 268 | delay := initialDelay |
| 269 | for { |
| 270 | select { |
| 271 | case <-ctx.Done(): |
| 272 | return ctx.Err() |
| 273 | case <-time.After(delay): |
| 274 | } |
| 275 | |
| 276 | task, err := client.GetTask(ctx, &a2a.GetTaskRequest{ID: taskID}) |
| 277 | if err != nil { |
| 278 | // If the server has dropped the task while we were polling, |
| 279 | // the marker is stale: clear it and surface a clean error. |
| 280 | if errors.Is(err, a2a.ErrTaskNotFound) { |
| 281 | _ = ClearStateMarker(conversationID, o) |
| 282 | return fmt.Errorf("a2a agent %s: task %s no longer exists on server (state marker cleared)", agentID, taskID) |
| 283 | } |
| 284 | return fmt.Errorf("a2a agent %s: poll GetTask(%s): %w", agentID, taskID, err) |
| 285 | } |
| 286 | |
| 287 | // Emit newly observed artifacts. Track whether anything new |
| 288 | // arrived this iteration. |
| 289 | sawNewArtifact := false |
| 290 | for _, art := range task.Artifacts { |
| 291 | if emittedArtifactIDs[art.ID] { |
| 292 | continue |
| 293 | } |
| 294 | emittedArtifactIDs[art.ID] = true |
| 295 | sawNewArtifact = true |
| 296 | msgs := a2aArtifactToMessages(art, "agent") |
| 297 | if len(msgs) > 0 { |
| 298 | if err := o(&proto.AgentOutputs{Messages: msgs}); err != nil { |
| 299 | return err |
| 300 | } |
| 301 | } |
no test coverage detected