syncToRemoteWithLock syncs dirty pages to the remote replica. Caller must hold f.mu.
()
| 1898 | // syncToRemoteWithLock syncs dirty pages to the remote replica. |
| 1899 | // Caller must hold f.mu. |
| 1900 | func (f *VFSFile) syncToRemoteWithLock() error { |
| 1901 | // Double-check dirty pages exist |
| 1902 | if len(f.dirty) == 0 { |
| 1903 | return nil |
| 1904 | } |
| 1905 | |
| 1906 | ctx := f.ctx |
| 1907 | |
| 1908 | // Check for conflicts |
| 1909 | if err := f.checkForConflict(ctx); err != nil { |
| 1910 | return err |
| 1911 | } |
| 1912 | |
| 1913 | // Create LTX file from dirty pages |
| 1914 | ltxReader := f.createLTXFromDirty() |
| 1915 | |
| 1916 | // Upload LTX file to remote |
| 1917 | info, err := f.client.WriteLTXFile(ctx, 0, f.pendingTXID, f.pendingTXID, ltxReader) |
| 1918 | if err != nil { |
| 1919 | return fmt.Errorf("upload LTX: %w", err) |
| 1920 | } |
| 1921 | |
| 1922 | f.logger.Info("synced to remote", |
| 1923 | "txid", info.MaxTXID, |
| 1924 | "pages", len(f.dirty), |
| 1925 | "size", info.Size) |
| 1926 | |
| 1927 | f.expectedTXID = f.pendingTXID |
| 1928 | f.pendingTXID++ |
| 1929 | f.pos = ltx.Pos{TXID: f.expectedTXID} |
| 1930 | |
| 1931 | if f.vfs != nil { |
| 1932 | f.vfs.writeMu.Lock() |
| 1933 | if f.expectedTXID > f.vfs.lastSyncedTXID { |
| 1934 | f.vfs.lastSyncedTXID = f.expectedTXID |
| 1935 | } |
| 1936 | f.vfs.writeMu.Unlock() |
| 1937 | } |
| 1938 | |
| 1939 | // Update cache with synced pages (index will be populated naturally when pages are fetched) |
| 1940 | for pgno, bufferOff := range f.dirty { |
| 1941 | cachedData := make([]byte, f.pageSize) |
| 1942 | if _, err := f.bufferFile.ReadAt(cachedData, bufferOff); err != nil { |
| 1943 | return fmt.Errorf("read page %d from buffer for cache: %w", pgno, err) |
| 1944 | } |
| 1945 | f.cache.Add(pgno, cachedData) |
| 1946 | } |
| 1947 | |
| 1948 | // Apply synced pages to hydrated file if hydration is complete |
| 1949 | // Must be done before clearing f.dirty since we need the page offsets |
| 1950 | if f.hydrator != nil && f.hydrator.Complete() { |
| 1951 | if err := f.applySyncedPagesToHydratedFile(); err != nil { |
| 1952 | f.logger.Error("failed to apply synced pages to hydrated file", "error", err) |
| 1953 | // Don't fail the sync - hydration will catch up on next poll |
| 1954 | } |
| 1955 | } |
| 1956 | |
| 1957 | // Clear dirty pages |
no test coverage detected