MCPcopy
hub / github.com/google/ax / PollUntilTerminal

Function PollUntilTerminal

internal/experimental/a2abridge/a2a_event_handler.go:244–332  ·  view source on GitHub ↗

PollUntilTerminal polls GetTask with exponential backoff until the task reaches a terminal state, the context is canceled, or an error occurs. Newly discovered artifacts and status changes are emitted to the client. client and agentID identify the SDK client and the bridge's registered agent ID for

(
	ctx context.Context,
	client *a2aclient.Client,
	agentID string,
	conversationID string,
	toolCallID string,
	taskID a2a.TaskID,
	lastSeen *a2a.Task,
	alreadyEmitted []string,
	o agent.OutputHandler,
)

Source from the content-addressed store, hash-verified

242// the caller. It prevents re-emission across the streaming<->polling handoff
243// and across invocation boundaries.
244func PollUntilTerminal(
245 ctx context.Context,
246 client *a2aclient.Client,
247 agentID string,
248 conversationID string,
249 toolCallID string,
250 taskID a2a.TaskID,
251 lastSeen *a2a.Task,
252 alreadyEmitted []string,
253 o agent.OutputHandler,
254) error {
255 emittedArtifactIDs := make(map[a2a.ArtifactID]bool)
256 if lastSeen != nil {
257 for _, art := range lastSeen.Artifacts {
258 emittedArtifactIDs[art.ID] = true
259 }
260 }
261 for _, id := range alreadyEmitted {
262 emittedArtifactIDs[a2a.ArtifactID(id)] = true
263 }
264
265 const initialDelay = 250 * time.Millisecond
266 const maxDelay = 60 * time.Second
267
268 delay := initialDelay
269 for {
270 select {
271 case <-ctx.Done():
272 return ctx.Err()
273 case <-time.After(delay):
274 }
275
276 task, err := client.GetTask(ctx, &a2a.GetTaskRequest{ID: taskID})
277 if err != nil {
278 // If the server has dropped the task while we were polling,
279 // the marker is stale: clear it and surface a clean error.
280 if errors.Is(err, a2a.ErrTaskNotFound) {
281 _ = ClearStateMarker(conversationID, o)
282 return fmt.Errorf("a2a agent %s: task %s no longer exists on server (state marker cleared)", agentID, taskID)
283 }
284 return fmt.Errorf("a2a agent %s: poll GetTask(%s): %w", agentID, taskID, err)
285 }
286
287 // Emit newly observed artifacts. Track whether anything new
288 // arrived this iteration.
289 sawNewArtifact := false
290 for _, art := range task.Artifacts {
291 if emittedArtifactIDs[art.ID] {
292 continue
293 }
294 emittedArtifactIDs[art.ID] = true
295 sawNewArtifact = true
296 msgs := a2aArtifactToMessages(art, "agent")
297 if len(msgs) > 0 {
298 if err := o(&proto.AgentOutputs{Messages: msgs}); err != nil {
299 return err
300 }
301 }

Callers 2

subscribeAndProcessMethod · 0.92
IterateStreamFunction · 0.85

Calls 6

ClearStateMarkerFunction · 0.85
a2aArtifactToMessagesFunction · 0.85
AuthRequiredErrorFunction · 0.85
EmitConfirmationFunction · 0.85
emittedArtifactKeysFunction · 0.85
AfterMethod · 0.80

Tested by

no test coverage detected