OnEvent applies the per-event-type persistence rules. Sub-session events are skipped (the parent absorbs them on SubSessionCompleted), and any [SessionScoped] event tagged with a different session id (forwarded sub-agent streaming events) is filtered out so it can't pollute the parent's transcript.
(ctx context.Context, sess *session.Session, event Event)
| 72 | // (forwarded sub-agent streaming events) is filtered out so it can't |
| 73 | // pollute the parent's transcript. |
| 74 | func (p *PersistenceObserver) OnEvent(ctx context.Context, sess *session.Session, event Event) { |
| 75 | if sess.IsSubSession() { |
| 76 | return |
| 77 | } |
| 78 | if scoped, ok := event.(SessionScoped); ok && scoped.GetSessionID() != sess.ID { |
| 79 | return |
| 80 | } |
| 81 | |
| 82 | switch e := event.(type) { |
| 83 | case *AgentChoiceEvent: |
| 84 | p.streaming.content.WriteString(e.Content) |
| 85 | p.streaming.agentName = e.AgentName |
| 86 | p.persistStreamingContent(ctx, sess.ID) |
| 87 | |
| 88 | case *AgentChoiceReasoningEvent: |
| 89 | p.streaming.reasoningContent.WriteString(e.Content) |
| 90 | p.streaming.agentName = e.AgentName |
| 91 | p.persistStreamingContent(ctx, sess.ID) |
| 92 | |
| 93 | case *UserMessageEvent: |
| 94 | p.streaming = streamingState{} |
| 95 | if _, err := p.store.AddMessage(ctx, e.SessionID, session.UserMessage(e.Message, e.MultiContent...)); err != nil { |
| 96 | slog.WarnContext(ctx, "Failed to persist user message", "session_id", e.SessionID, "error", err) |
| 97 | } |
| 98 | |
| 99 | case *MessageAddedEvent: |
| 100 | // Finalise the streaming row (if any) with the canonical |
| 101 | // MessageAddedEvent payload, then reset for the next stream. |
| 102 | var err error |
| 103 | if p.streaming.messageID != 0 { |
| 104 | err = p.store.UpdateMessage(ctx, p.streaming.messageID, e.Message) |
| 105 | } else { |
| 106 | _, err = p.store.AddMessage(ctx, e.SessionID, e.Message) |
| 107 | } |
| 108 | if err != nil { |
| 109 | slog.WarnContext(ctx, "Failed to persist message", |
| 110 | "session_id", e.SessionID, "message_id", p.streaming.messageID, "error", err) |
| 111 | } |
| 112 | p.streaming = streamingState{} |
| 113 | |
| 114 | case *SubSessionCompletedEvent: |
| 115 | if subSess, ok := e.SubSession.(*session.Session); ok { |
| 116 | if err := p.store.AddSubSession(ctx, e.ParentSessionID, subSess); err != nil { |
| 117 | slog.WarnContext(ctx, "Failed to persist sub-session", "parent_id", e.ParentSessionID, "error", err) |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | case *SessionSummaryEvent: |
| 122 | if err := p.store.AddSummary(ctx, e.SessionID, e.Summary, e.FirstKeptEntry); err != nil { |
| 123 | slog.WarnContext(ctx, "Failed to persist summary", "session_id", e.SessionID, "error", err) |
| 124 | } |
| 125 | |
| 126 | case *TokenUsageEvent: |
| 127 | if e.Usage != nil { |
| 128 | if err := p.store.UpdateSessionTokens(ctx, sess.ID, e.Usage.InputTokens, e.Usage.OutputTokens, e.Usage.Cost); err != nil { |
| 129 | slog.WarnContext(ctx, "Failed to persist token usage", "session_id", sess.ID, "error", err) |
| 130 | } |
| 131 | } |
nothing calls this directly
no test coverage detected