FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory ca
(dir string)
| 563 | // Note that if the read only database is running concurrently with a |
| 564 | // writable database then writing the WAL to the database directory can race. |
| 565 | func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { |
| 566 | blockReaders, err := db.Blocks() |
| 567 | if err != nil { |
| 568 | return fmt.Errorf("read blocks: %w", err) |
| 569 | } |
| 570 | maxBlockTime := int64(math.MinInt64) |
| 571 | if len(blockReaders) > 0 { |
| 572 | maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime |
| 573 | } |
| 574 | w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal")) |
| 575 | if err != nil { |
| 576 | return err |
| 577 | } |
| 578 | var wbl *wlog.WL |
| 579 | wblDir := filepath.Join(db.dir, wlog.WblDirName) |
| 580 | if _, err := os.Stat(wblDir); !os.IsNotExist(err) { |
| 581 | wbl, err = wlog.Open(db.logger, wblDir) |
| 582 | if err != nil { |
| 583 | return err |
| 584 | } |
| 585 | } |
| 586 | opts := DefaultHeadOptions() |
| 587 | opts.ChunkDirRoot = db.dir |
| 588 | head, err := NewHead(nil, db.logger, w, wbl, opts, NewHeadStats()) |
| 589 | if err != nil { |
| 590 | return err |
| 591 | } |
| 592 | defer func() { |
| 593 | if err := head.Close(); err != nil { |
| 594 | returnErr = errors.Join(returnErr, fmt.Errorf("closing Head: %w", err)) |
| 595 | } |
| 596 | }() |
| 597 | // Set the min valid time for the ingested wal samples |
| 598 | // to be no lower than the maxt of the last block. |
| 599 | if err := head.Init(maxBlockTime); err != nil { |
| 600 | return fmt.Errorf("read WAL: %w", err) |
| 601 | } |
| 602 | mint := head.MinTime() |
| 603 | maxt := head.MaxTime() |
| 604 | rh := NewRangeHead(head, mint, maxt) |
| 605 | compactor, err := NewLeveledCompactor( |
| 606 | context.Background(), |
| 607 | nil, |
| 608 | db.logger, |
| 609 | ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), |
| 610 | chunkenc.NewPool(), nil, |
| 611 | ) |
| 612 | if err != nil { |
| 613 | return fmt.Errorf("create leveled compactor: %w", err) |
| 614 | } |
| 615 | // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). |
| 616 | // Because of this block intervals are always +1 than the total samples it includes. |
| 617 | _, err = compactor.Write(dir, rh, mint, maxt+1, nil) |
| 618 | if err != nil { |
| 619 | return fmt.Errorf("writing WAL: %w", err) |
| 620 | } |
| 621 | return nil |
| 622 | } |