GetOrCreateSource gets or creates a source by type and identifier. Concurrent first-inserts converge via INSERT ... ON CONFLICT DO UPDATE RETURNING: the no-op SET fires RETURNING on both the insert and conflict path so the second caller receives the existing row's fields instead of a unique-violatio
(sourceType, identifier string)
| 576 | // conflict path so the second caller receives the existing row's |
| 577 | // fields instead of a unique-violation error. |
| 578 | func (s *Store) GetOrCreateSource(sourceType, identifier string) (*Source, error) { |
| 579 | now := s.dialect.Now() |
| 580 | row := s.db.QueryRow(fmt.Sprintf(` |
| 581 | INSERT INTO sources (source_type, identifier, created_at, updated_at) |
| 582 | VALUES (?, ?, %s, %s) |
| 583 | ON CONFLICT (source_type, identifier) DO UPDATE |
| 584 | SET identifier = sources.identifier |
| 585 | RETURNING id, source_type, identifier, display_name, google_user_id, |
| 586 | last_sync_at, sync_cursor, sync_config, oauth_app, |
| 587 | created_at, updated_at |
| 588 | `, now, now), sourceType, identifier) |
| 589 | |
| 590 | source, err := scanSource(row) |
| 591 | if err != nil { |
| 592 | return nil, fmt.Errorf("upsert source: %w", err) |
| 593 | } |
| 594 | |
| 595 | // Add to the default "All" collection if it exists. |
| 596 | // |
| 597 | // This runs as a separate Exec rather than inside a transaction |
| 598 | // with the source insert. If this Exec fails, the source row is |
| 599 | // committed but the All membership is missing — and the next |
| 600 | // EnsureDefaultCollection call (which runs in InitSchema on every |
| 601 | // process launch) re-adds every source not yet linked. Self-heals |
| 602 | // on next CLI invocation; until then collection-scoped reads of |
| 603 | // All would miss this source. Acceptable for a single-user tool; |
| 604 | // a future refactor can fold this into a withTx. |
| 605 | if _, err := s.db.Exec( |
| 606 | s.dialect.InsertOrIgnore( |
| 607 | `INSERT OR IGNORE INTO collection_sources (collection_id, source_id) |
| 608 | SELECT id, ? FROM collections WHERE name = ?`, |
| 609 | ), |
| 610 | source.ID, DefaultCollectionName, |
| 611 | ); err != nil { |
| 612 | slog.Warn("failed to add source to default collection (self-heals on next InitSchema)", |
| 613 | "source_id", source.ID, |
| 614 | "identifier", identifier, |
| 615 | "error", err, |
| 616 | ) |
| 617 | } |
| 618 | |
| 619 | return source, nil |
| 620 | } |
| 621 | |
| 622 | // UpdateSourceSyncCursor updates the sync cursor (historyId) for a source. |
| 623 | func (s *Store) UpdateSourceSyncCursor(sourceID int64, cursor string) error { |