MCPcopy
hub / github.com/prometheus/prometheus / FlushWAL

Method FlushWAL

tsdb/db.go:565–622  ·  view source on GitHub ↗

FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory ca

(dir string)

Source from the content-addressed store, hash-verified

563// Note that if the read only database is running concurrently with a
564// writable database then writing the WAL to the database directory can race.
565func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
566 blockReaders, err := db.Blocks()
567 if err != nil {
568 return fmt.Errorf("read blocks: %w", err)
569 }
570 maxBlockTime := int64(math.MinInt64)
571 if len(blockReaders) > 0 {
572 maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime
573 }
574 w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal"))
575 if err != nil {
576 return err
577 }
578 var wbl *wlog.WL
579 wblDir := filepath.Join(db.dir, wlog.WblDirName)
580 if _, err := os.Stat(wblDir); !os.IsNotExist(err) {
581 wbl, err = wlog.Open(db.logger, wblDir)
582 if err != nil {
583 return err
584 }
585 }
586 opts := DefaultHeadOptions()
587 opts.ChunkDirRoot = db.dir
588 head, err := NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
589 if err != nil {
590 return err
591 }
592 defer func() {
593 if err := head.Close(); err != nil {
594 returnErr = errors.Join(returnErr, fmt.Errorf("closing Head: %w", err))
595 }
596 }()
597 // Set the min valid time for the ingested wal samples
598 // to be no lower than the maxt of the last block.
599 if err := head.Init(maxBlockTime); err != nil {
600 return fmt.Errorf("read WAL: %w", err)
601 }
602 mint := head.MinTime()
603 maxt := head.MaxTime()
604 rh := NewRangeHead(head, mint, maxt)
605 compactor, err := NewLeveledCompactor(
606 context.Background(),
607 nil,
608 db.logger,
609 ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
610 chunkenc.NewPool(), nil,
611 )
612 if err != nil {
613 return fmt.Errorf("create leveled compactor: %w", err)
614 }
615 // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
616 // Because of this block intervals are always +1 than the total samples it includes.
617 _, err = compactor.Write(dir, rh, mint, maxt+1, nil)
618 if err != nil {
619 return fmt.Errorf("writing WAL: %w", err)
620 }
621 return nil
622}

Callers 2

TestDBReadOnly_FlushWALFunction · 0.80

Calls 15

BlocksMethod · 0.95
CloseMethod · 0.95
InitMethod · 0.95
MinTimeMethod · 0.95
MaxTimeMethod · 0.95
WriteMethod · 0.95
OpenFunction · 0.92
NewPoolFunction · 0.92
DefaultHeadOptionsFunction · 0.85
NewHeadFunction · 0.85
NewHeadStatsFunction · 0.85
NewRangeHeadFunction · 0.85

Tested by 2

TestDBReadOnly_FlushWALFunction · 0.64