MCPcopy
hub / github.com/docker/docker-agent / OnEvent

Method OnEvent

pkg/runtime/persistence_observer.go:74–158  ·  view source on GitHub ↗

OnEvent applies the per-event-type persistence rules. Sub-session events are skipped (the parent absorbs them on SubSessionCompleted), and any [SessionScoped] event tagged with a different session id (forwarded sub-agent streaming events) is filtered out so it can't pollute the parent's transcript.

(ctx context.Context, sess *session.Session, event Event)

Source from the content-addressed store, hash-verified

72// (forwarded sub-agent streaming events) is filtered out so it can't
73// pollute the parent's transcript.
74func (p *PersistenceObserver) OnEvent(ctx context.Context, sess *session.Session, event Event) {
75 if sess.IsSubSession() {
76 return
77 }
78 if scoped, ok := event.(SessionScoped); ok && scoped.GetSessionID() != sess.ID {
79 return
80 }
81
82 switch e := event.(type) {
83 case *AgentChoiceEvent:
84 p.streaming.content.WriteString(e.Content)
85 p.streaming.agentName = e.AgentName
86 p.persistStreamingContent(ctx, sess.ID)
87
88 case *AgentChoiceReasoningEvent:
89 p.streaming.reasoningContent.WriteString(e.Content)
90 p.streaming.agentName = e.AgentName
91 p.persistStreamingContent(ctx, sess.ID)
92
93 case *UserMessageEvent:
94 p.streaming = streamingState{}
95 if _, err := p.store.AddMessage(ctx, e.SessionID, session.UserMessage(e.Message, e.MultiContent...)); err != nil {
96 slog.WarnContext(ctx, "Failed to persist user message", "session_id", e.SessionID, "error", err)
97 }
98
99 case *MessageAddedEvent:
100 // Finalise the streaming row (if any) with the canonical
101 // MessageAddedEvent payload, then reset for the next stream.
102 var err error
103 if p.streaming.messageID != 0 {
104 err = p.store.UpdateMessage(ctx, p.streaming.messageID, e.Message)
105 } else {
106 _, err = p.store.AddMessage(ctx, e.SessionID, e.Message)
107 }
108 if err != nil {
109 slog.WarnContext(ctx, "Failed to persist message",
110 "session_id", e.SessionID, "message_id", p.streaming.messageID, "error", err)
111 }
112 p.streaming = streamingState{}
113
114 case *SubSessionCompletedEvent:
115 if subSess, ok := e.SubSession.(*session.Session); ok {
116 if err := p.store.AddSubSession(ctx, e.ParentSessionID, subSess); err != nil {
117 slog.WarnContext(ctx, "Failed to persist sub-session", "parent_id", e.ParentSessionID, "error", err)
118 }
119 }
120
121 case *SessionSummaryEvent:
122 if err := p.store.AddSummary(ctx, e.SessionID, e.Summary, e.FirstKeptEntry); err != nil {
123 slog.WarnContext(ctx, "Failed to persist summary", "session_id", e.SessionID, "error", err)
124 }
125
126 case *TokenUsageEvent:
127 if e.Usage != nil {
128 if err := p.store.UpdateSessionTokens(ctx, sess.ID, e.Usage.InputTokens, e.Usage.OutputTokens, e.Usage.Cost); err != nil {
129 slog.WarnContext(ctx, "Failed to persist token usage", "session_id", sess.ID, "error", err)
130 }
131 }

Callers

nothing calls this directly

Calls 13

UserMessageFunction · 0.92
NowMethod · 0.80
GetSessionIDMethod · 0.65
AddMessageMethod · 0.65
UpdateMessageMethod · 0.65
AddSubSessionMethod · 0.65
AddSummaryMethod · 0.65
UpdateSessionTokensMethod · 0.65
UpdateSessionTitleMethod · 0.65
AddErrorMethod · 0.65
IsSubSessionMethod · 0.45

Tested by

no test coverage detected