sync copies pending bytes from the real WAL to LTX. Returns synced=true if an LTX file was created (i.e., there were new pages to sync).
(ctx context.Context, checkpointing bool, exec *syncExecutor, info syncInfo)
| 1805 | // sync copies pending bytes from the real WAL to LTX. |
| 1806 | // Returns synced=true if an LTX file was created (i.e., there were new pages to sync). |
| 1807 | func (db *DB) sync(ctx context.Context, checkpointing bool, exec *syncExecutor, info syncInfo) (result syncResult, err error) { |
| 1808 | result.newWALSize = exec.state.lastSyncedWALOffset |
| 1809 | result.syncedToWALEnd = exec.state.syncedToWALEnd |
| 1810 | if info.clearSyncedToWALEnd { |
| 1811 | result.syncedToWALEnd = false |
| 1812 | } |
| 1813 | |
| 1814 | // Determine the next sequential transaction ID. |
| 1815 | txID := exec.pos.TXID + 1 |
| 1816 | db.setSyncDiagPhase(diagPhaseSyncOpenLTX, |
| 1817 | func(s *diagState) { |
| 1818 | s.txID = txID |
| 1819 | s.snapshotting = info.snapshotting |
| 1820 | s.reason = info.reason |
| 1821 | }) |
| 1822 | |
| 1823 | filename := db.LTXPath(0, txID, txID) |
| 1824 | |
| 1825 | logArgs := []any{ |
| 1826 | "txid", txID.String(), |
| 1827 | "offset", info.offset, |
| 1828 | } |
| 1829 | if checkpointing { |
| 1830 | logArgs = append(logArgs, "chkpt", "true") |
| 1831 | } |
| 1832 | if info.snapshotting { |
| 1833 | logArgs = append(logArgs, "snap", "true") |
| 1834 | } |
| 1835 | if info.reason != "" { |
| 1836 | logArgs = append(logArgs, "reason", info.reason) |
| 1837 | } |
| 1838 | db.Logger.Debug("sync", logArgs...) |
| 1839 | |
| 1840 | // Prevent internal checkpoints during sync. Ignore if already in a checkpoint. |
| 1841 | if !checkpointing { |
| 1842 | db.chkMu.RLock() |
| 1843 | defer db.chkMu.RUnlock() |
| 1844 | } |
| 1845 | |
| 1846 | fi, err := db.f.Stat() |
| 1847 | if err != nil { |
| 1848 | return result, err |
| 1849 | } |
| 1850 | mode := fi.Mode() |
| 1851 | commit := uint32(fi.Size() / int64(db.pageSize)) |
| 1852 | |
| 1853 | walFile, err := os.Open(db.WALPath()) |
| 1854 | if err != nil { |
| 1855 | return result, err |
| 1856 | } |
| 1857 | defer walFile.Close() |
| 1858 | |
| 1859 | walReaderLogger := db.Logger.With(LogKeySubsystem, LogSubsystemWALReader) |
| 1860 | var rd *WALReader |
| 1861 | if info.offset == WALHeaderSize { |
| 1862 | if rd, err = NewWALReader(walFile, walReaderLogger); err != nil { |
| 1863 | return result, fmt.Errorf("new wal reader: %w", err) |
| 1864 | } |
no test coverage detected