handleEvent dispatches a single event from the A2A stream. Returns (stop, err): stop indicates the stream should end (e.g. HITL pause).
(evt a2a.Event)
| 54 | // handleEvent dispatches a single event from the A2A stream. Returns |
| 55 | // (stop, err): stop indicates the stream should end (e.g. HITL pause). |
| 56 | func (s *streamState) handleEvent(evt a2a.Event) (bool, error) { |
| 57 | switch e := evt.(type) { |
| 58 | case *a2a.Message: |
| 59 | // Direct message response (no task). |
| 60 | msgs := a2aPartsToMessages(e.Parts, a2aRoleToRole(e.Role)) |
| 61 | if len(msgs) > 0 { |
| 62 | if err := s.o(&proto.AgentOutputs{Messages: msgs}); err != nil { |
| 63 | return false, err |
| 64 | } |
| 65 | } |
| 66 | return false, nil |
| 67 | |
| 68 | case *a2a.Task: |
| 69 | // Snapshot of the task. Track it; emit any artifacts we haven't |
| 70 | // seen yet. If the task is in INPUT_REQUIRED, treat it the same |
| 71 | // way as the corresponding TaskStatusUpdateEvent: surface a |
| 72 | // ConfirmationContent and persist the activeTaskID, then stop |
| 73 | // the loop. |
| 74 | s.lastSeenTask = e |
| 75 | if e.ID != "" { |
| 76 | s.activeTaskID = e.ID |
| 77 | } |
| 78 | for _, art := range e.Artifacts { |
| 79 | if s.emittedArtifact[art.ID] { |
| 80 | continue |
| 81 | } |
| 82 | s.emittedArtifact[art.ID] = true |
| 83 | msgs := a2aArtifactToMessages(art, "agent") |
| 84 | if len(msgs) > 0 { |
| 85 | if err := s.o(&proto.AgentOutputs{Messages: msgs}); err != nil { |
| 86 | return false, err |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | if e.Status.State == a2a.TaskStateAuthRequired { |
| 91 | // Server is asking for refreshed credentials. The current static |
| 92 | // credential model can't hot-reload, so mark terminal, |
| 93 | // clear the marker, and return an actionable error. |
| 94 | s.terminal = true |
| 95 | _ = ClearStateMarker(s.conversationID, s.o) |
| 96 | return false, AuthRequiredError(s.agentID, s.activeTaskID, e.Status.Message) |
| 97 | } |
| 98 | if e.Status.State == a2a.TaskStateInputRequired { |
| 99 | if err := EmitConfirmation(s.conversationID, s.toolCallID, s.activeTaskID, e, emittedArtifactKeys(s.emittedArtifact), s.o); err != nil { |
| 100 | return false, err |
| 101 | } |
| 102 | return true, nil |
| 103 | } |
| 104 | if e.Status.State.Terminal() { |
| 105 | s.terminal = true |
| 106 | } |
| 107 | return false, nil |
| 108 | |
| 109 | case *a2a.TaskStatusUpdateEvent: |
| 110 | if e.TaskID != "" { |
| 111 | s.activeTaskID = e.TaskID |
| 112 | } |
| 113 | // Surface any human-readable status message to the client - EXCEPT |