MCPcopy
hub / github.com/kenn-io/msgvault / GetOrCreateSource

Method GetOrCreateSource

internal/store/sync.go:578–620  ·  view source on GitHub ↗

GetOrCreateSource gets or creates a source by type and identifier. Concurrent first-inserts converge via INSERT ... ON CONFLICT DO UPDATE RETURNING: the no-op SET fires RETURNING on both the insert and conflict path so the second caller receives the existing row's fields instead of a unique-violatio

(sourceType, identifier string)

Source from the content-addressed store, hash-verified

576// conflict path so the second caller receives the existing row's
577// fields instead of a unique-violation error.
578func (s *Store) GetOrCreateSource(sourceType, identifier string) (*Source, error) {
579 now := s.dialect.Now()
580 row := s.db.QueryRow(fmt.Sprintf(`
581 INSERT INTO sources (source_type, identifier, created_at, updated_at)
582 VALUES (?, ?, %s, %s)
583 ON CONFLICT (source_type, identifier) DO UPDATE
584 SET identifier = sources.identifier
585 RETURNING id, source_type, identifier, display_name, google_user_id,
586 last_sync_at, sync_cursor, sync_config, oauth_app,
587 created_at, updated_at
588 `, now, now), sourceType, identifier)
589
590 source, err := scanSource(row)
591 if err != nil {
592 return nil, fmt.Errorf("upsert source: %w", err)
593 }
594
595 // Add to the default "All" collection if it exists.
596 //
597 // This runs as a separate Exec rather than inside a transaction
598 // with the source insert. If this Exec fails, the source row is
599 // committed but the All membership is missing — and the next
600 // EnsureDefaultCollection call (which runs in InitSchema on every
601 // process launch) re-adds every source not yet linked. Self-heals
602 // on next CLI invocation; until then collection-scoped reads of
603 // All would miss this source. Acceptable for a single-user tool;
604 // a future refactor can fold this into a withTx.
605 if _, err := s.db.Exec(
606 s.dialect.InsertOrIgnore(
607 `INSERT OR IGNORE INTO collection_sources (collection_id, source_id)
608 SELECT id, ? FROM collections WHERE name = ?`,
609 ),
610 source.ID, DefaultCollectionName,
611 ); err != nil {
612 slog.Warn("failed to add source to default collection (self-heals on next InitSchema)",
613 "source_id", source.ID,
614 "identifier", identifier,
615 "error", err,
616 )
617 }
618
619 return source, nil
620}
621
622// UpdateSourceSyncCursor updates the sync cursor (historyId) for a source.
623func (s *Store) UpdateSourceSyncCursor(sourceID int64, cursor string) error {

Calls 6

scanSourceFunction · 0.85
NowMethod · 0.65
QueryRowMethod · 0.65
ExecMethod · 0.65
InsertOrIgnoreMethod · 0.65
ErrorfMethod · 0.45