open returns a new DB in the given directory. It initializes the lockfile, WAL, compactor, and Head (by replaying the WAL), and runs the database. It is not safe to open more than one DB in the same directory.
(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats)
| 948 | // It initializes the lockfile, WAL, compactor, and Head (by replaying the WAL), and runs the database. |
| 949 | // It is not safe to open more than one DB in the same directory. |
| 950 | func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) { |
| 951 | if err := os.MkdirAll(dir, 0o777); err != nil { |
| 952 | return nil, err |
| 953 | } |
| 954 | if l == nil { |
| 955 | l = promslog.NewNopLogger() |
| 956 | } |
| 957 | if stats == nil { |
| 958 | stats = NewDBStats() |
| 959 | } |
| 960 | |
| 961 | for i, v := range rngs { |
| 962 | if v > opts.MaxBlockDuration { |
| 963 | rngs = rngs[:i] |
| 964 | break |
| 965 | } |
| 966 | } |
| 967 | |
| 968 | // Fixup bad format written by Prometheus 2.1. |
| 969 | if err := repairBadIndexVersion(l, dir); err != nil { |
| 970 | return nil, fmt.Errorf("repair bad index version: %w", err) |
| 971 | } |
| 972 | |
| 973 | walDir := filepath.Join(dir, "wal") |
| 974 | wblDir := filepath.Join(dir, wlog.WblDirName) |
| 975 | |
| 976 | for _, tmpDir := range []string{walDir, dir} { |
| 977 | // Remove tmp dirs. |
| 978 | if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil { |
| 979 | return nil, fmt.Errorf("remove tmp dirs: %w", err) |
| 980 | } |
| 981 | // Remove any temporary checkpoints that might have been interrupted during creation. |
| 982 | if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil { |
| 983 | return nil, fmt.Errorf("delete temp checkpoints: %w", err) |
| 984 | } |
| 985 | } |
| 986 | |
| 987 | db := &DB{ |
| 988 | dir: dir, |
| 989 | logger: l, |
| 990 | opts: opts, |
| 991 | compactc: make(chan struct{}, 1), |
| 992 | donec: make(chan struct{}), |
| 993 | stopc: make(chan struct{}), |
| 994 | autoCompact: true, |
| 995 | chunkPool: chunkenc.NewPool(), |
| 996 | blocksToDelete: opts.BlocksToDelete, |
| 997 | registerer: r, |
| 998 | } |
| 999 | defer func() { |
| 1000 | // Close files if startup fails somewhere. |
| 1001 | if returnedErr == nil { |
| 1002 | return |
| 1003 | } |
| 1004 | |
| 1005 | close(db.donec) // DB is never run if it was an error, so close this channel here. |
| 1006 | if err := db.Close(); err != nil { |
| 1007 | returnedErr = errors.Join(returnedErr, fmt.Errorf("close DB after failed startup: %w", err)) |
searching dependent graphs…