runHydration performs the background hydration process.
(infos []*ltx.FileInfo)
| 1300 | |
| 1301 | // runHydration performs the background hydration process. |
| 1302 | func (f *VFSFile) runHydration(infos []*ltx.FileInfo) { |
| 1303 | defer f.wg.Done() |
| 1304 | |
| 1305 | hydrationTXID := f.hydrator.TXID() |
| 1306 | |
| 1307 | f.mu.Lock() |
| 1308 | currentTXID := f.pos.TXID |
| 1309 | f.mu.Unlock() |
| 1310 | |
| 1311 | if hydrationTXID > 0 && currentTXID >= hydrationTXID { |
| 1312 | f.logger.Debug("resuming hydration from persistent file", "txid", hydrationTXID.String()) |
| 1313 | } else { |
| 1314 | if hydrationTXID > 0 { |
| 1315 | f.logger.Warn("remote TXID regressed, discarding persistent hydration", |
| 1316 | "hydration_txid", hydrationTXID.String(), |
| 1317 | "current_txid", currentTXID.String()) |
| 1318 | if err := f.hydrator.Truncate(0); err != nil { |
| 1319 | f.hydrator.SetErr(err) |
| 1320 | f.logger.Error("hydration truncate failed", "error", err) |
| 1321 | return |
| 1322 | } |
| 1323 | } |
| 1324 | if err := f.hydrator.Restore(f.ctx, infos); err != nil { |
| 1325 | f.hydrator.SetErr(err) |
| 1326 | f.logger.Error("hydration failed", "error", err) |
| 1327 | return |
| 1328 | } |
| 1329 | hydrationTXID = f.hydrator.TXID() |
| 1330 | } |
| 1331 | |
| 1332 | if currentTXID > hydrationTXID { |
| 1333 | if err := f.hydrator.CatchUp(f.ctx, hydrationTXID, currentTXID); err != nil { |
| 1334 | f.hydrator.SetErr(err) |
| 1335 | f.logger.Error("hydration catch-up failed", "error", err) |
| 1336 | return |
| 1337 | } |
| 1338 | } |
| 1339 | |
| 1340 | f.hydrator.SetComplete() |
| 1341 | |
| 1342 | // Clear cache since we'll now read from hydration file |
| 1343 | f.cache.Purge() |
| 1344 | |
| 1345 | f.logger.Debug("hydration complete", "path", f.hydrationPath, "txid", f.hydrator.TXID().String()) |
| 1346 | } |
| 1347 | |
| 1348 | // applySyncedPagesToHydratedFile writes synced dirty pages to the hydrated file. |
| 1349 | // Must be called with f.mu held. |