Pos returns the current replication position of the database. The result is cached and invalidated when L0 LTX files change.
()
| 557 | // Pos returns the current replication position of the database. |
| 558 | // The result is cached and invalidated when L0 LTX files change. |
| 559 | func (db *DB) Pos() (ltx.Pos, error) { |
| 560 | db.pos.Lock() |
| 561 | defer db.pos.Unlock() |
| 562 | |
| 563 | if db.pos.value != nil { |
| 564 | return *db.pos.value, nil |
| 565 | } |
| 566 | |
| 567 | minTXID, maxTXID, err := db.MaxLTX() |
| 568 | if err != nil { |
| 569 | return ltx.Pos{}, err |
| 570 | } else if minTXID == 0 { |
| 571 | return ltx.Pos{}, nil // no replication yet |
| 572 | } |
| 573 | |
| 574 | ltxPath := db.LTXPath(0, minTXID, maxTXID) |
| 575 | f, err := os.Open(ltxPath) |
| 576 | if err != nil { |
| 577 | return ltx.Pos{}, NewLTXError("open", ltxPath, 0, uint64(minTXID), uint64(maxTXID), err) |
| 578 | } |
| 579 | defer func() { _ = f.Close() }() |
| 580 | |
| 581 | dec := ltx.NewDecoder(f) |
| 582 | if err := dec.Verify(); err != nil { |
| 583 | return ltx.Pos{}, NewLTXError("verify", ltxPath, 0, uint64(minTXID), uint64(maxTXID), fmt.Errorf("%w: %w", ErrLTXCorrupted, err)) |
| 584 | } |
| 585 | |
| 586 | pos := dec.PostApplyPos() |
| 587 | db.pos.value = &pos |
| 588 | |
| 589 | return pos, nil |
| 590 | } |
| 591 | |
| 592 | // invalidatePosCache clears the cached position so the next call to Pos() |
| 593 | // recomputes it from disk. Call this when L0 LTX files are deleted or |