MCPcopy
hub / github.com/benbjohnson/litestream / sync

Method sync

db.go:1807–2061  ·  view source on GitHub ↗

sync copies pending bytes from the real WAL to LTX. Returns synced=true if an LTX file was created (i.e., there were new pages to sync).

(ctx context.Context, checkpointing bool, exec *syncExecutor, info syncInfo)

Source from the content-addressed store, hash-verified

1805// sync copies pending bytes from the real WAL to LTX.
1806// Returns synced=true if an LTX file was created (i.e., there were new pages to sync).
1807func (db *DB) sync(ctx context.Context, checkpointing bool, exec *syncExecutor, info syncInfo) (result syncResult, err error) {
1808 result.newWALSize = exec.state.lastSyncedWALOffset
1809 result.syncedToWALEnd = exec.state.syncedToWALEnd
1810 if info.clearSyncedToWALEnd {
1811 result.syncedToWALEnd = false
1812 }
1813
1814 // Determine the next sequential transaction ID.
1815 txID := exec.pos.TXID + 1
1816 db.setSyncDiagPhase(diagPhaseSyncOpenLTX,
1817 func(s *diagState) {
1818 s.txID = txID
1819 s.snapshotting = info.snapshotting
1820 s.reason = info.reason
1821 })
1822
1823 filename := db.LTXPath(0, txID, txID)
1824
1825 logArgs := []any{
1826 "txid", txID.String(),
1827 "offset", info.offset,
1828 }
1829 if checkpointing {
1830 logArgs = append(logArgs, "chkpt", "true")
1831 }
1832 if info.snapshotting {
1833 logArgs = append(logArgs, "snap", "true")
1834 }
1835 if info.reason != "" {
1836 logArgs = append(logArgs, "reason", info.reason)
1837 }
1838 db.Logger.Debug("sync", logArgs...)
1839
1840 // Prevent internal checkpoints during sync. Ignore if already in a checkpoint.
1841 if !checkpointing {
1842 db.chkMu.RLock()
1843 defer db.chkMu.RUnlock()
1844 }
1845
1846 fi, err := db.f.Stat()
1847 if err != nil {
1848 return result, err
1849 }
1850 mode := fi.Mode()
1851 commit := uint32(fi.Size() / int64(db.pageSize))
1852
1853 walFile, err := os.Open(db.WALPath())
1854 if err != nil {
1855 return result, err
1856 }
1857 defer walFile.Close()
1858
1859 walReaderLogger := db.Logger.With(LogKeySubsystem, LogSubsystemWALReader)
1860 var rd *WALReader
1861 if info.offset == WALHeaderSize {
1862 if rd, err = NewWALReader(walFile, walReaderLogger); err != nil {
1863 return result, fmt.Errorf("new wal reader: %w", err)
1864 }

Callers 1

Calls 15

setSyncDiagPhaseMethod · 0.95
LTXPathMethod · 0.95
WALPathMethod · 0.95
PageMapMethod · 0.95
writeLTXFromDBMethod · 0.95
writeLTXFromWALMethod · 0.95
invalidatePosCacheMethod · 0.95
walFileSizeMethod · 0.95
MkdirAllFunction · 0.92
FileinfoFunction · 0.92
NewWALReaderFunction · 0.85
NewWALReaderWithOffsetFunction · 0.85

Tested by

no test coverage detected