CompactDB performs a compaction or snapshot for a given database on a single destination level. This function will only proceed if a compaction has not occurred before the last compaction time.
(ctx context.Context, db *DB, lvl *CompactionLevel)
| 743 | // CompactDB performs a compaction or snapshot for a given database on a single destination level. |
| 744 | // This function will only proceed if a compaction has not occurred before the last compaction time. |
| 745 | func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error) { |
| 746 | // Skip if database is not yet initialized (page size unknown). |
| 747 | if db.PageSize() == 0 { |
| 748 | return nil, &DBNotReadyError{Reason: "page size not initialized"} |
| 749 | } |
| 750 | |
| 751 | dstLevel := lvl.Level |
| 752 | |
| 753 | // Ensure we are not re-compacting before the most recent compaction time. |
| 754 | prevCompactionAt := lvl.PrevCompactionAt(time.Now()) |
| 755 | dstInfo, err := db.MaxLTXFileInfo(ctx, dstLevel) |
| 756 | if err != nil { |
| 757 | return nil, fmt.Errorf("fetch dst level info: %w", err) |
| 758 | } else if dstInfo.CreatedAt.After(prevCompactionAt) { |
| 759 | return nil, ErrCompactionTooEarly |
| 760 | } |
| 761 | |
| 762 | // Shortcut if this is a snapshot since we are not pulling from a previous level. |
| 763 | if dstLevel == SnapshotLevel { |
| 764 | info, err := db.Snapshot(ctx) |
| 765 | if err != nil { |
| 766 | return info, err |
| 767 | } |
| 768 | db.Logger.InfoContext(ctx, "snapshot complete", "txid", info.MaxTXID.String(), "size", info.Size) |
| 769 | return info, nil |
| 770 | } |
| 771 | |
| 772 | // Fetch latest LTX files for both the source & destination so we can see if we need to make progress. |
| 773 | srcLevel := s.levels.PrevLevel(dstLevel) |
| 774 | srcInfo, err := db.MaxLTXFileInfo(ctx, srcLevel) |
| 775 | if err != nil { |
| 776 | return nil, fmt.Errorf("fetch src level info: %w", err) |
| 777 | } |
| 778 | |
| 779 | // Skip if there are no new files to compact. |
| 780 | if srcInfo.MaxTXID <= dstInfo.MinTXID { |
| 781 | return nil, ErrNoCompaction |
| 782 | } |
| 783 | |
| 784 | info, err := db.Compact(ctx, dstLevel) |
| 785 | if err != nil { |
| 786 | return info, err |
| 787 | } |
| 788 | |
| 789 | db.Logger.InfoContext(ctx, "compaction complete", |
| 790 | "level", dstLevel, |
| 791 | slog.Group("txid", |
| 792 | "min", info.MinTXID.String(), |
| 793 | "max", info.MaxTXID.String(), |
| 794 | ), |
| 795 | "size", info.Size, |
| 796 | ) |
| 797 | |
| 798 | return info, nil |
| 799 | } |
| 800 | |
| 801 | // EnforceSnapshotRetention removes old snapshots by timestamp and then |
| 802 | // cleans up all lower levels based on minimum snapshot TXID. |