Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.
(minValidTime int64)
| 692 | // It should be called before using an appender so that it |
| 693 | // limits the ingested samples to the head min valid time. |
| 694 | func (h *Head) Init(minValidTime int64) error { |
| 695 | h.minValidTime.Store(minValidTime) |
| 696 | defer h.resetWLReplayResources() |
| 697 | defer func() { |
| 698 | h.postings.EnsureOrder(h.opts.WALReplayConcurrency) |
| 699 | }() |
| 700 | defer h.gc() // After loading the wal remove the obsolete data from the head. |
| 701 | defer func() { |
| 702 | // Loading of m-mapped chunks and snapshot can make the mint of the Head |
| 703 | // to go below minValidTime. |
| 704 | if h.MinTime() < h.minValidTime.Load() { |
| 705 | h.minTime.Store(h.minValidTime.Load()) |
| 706 | } |
| 707 | }() |
| 708 | |
| 709 | h.logger.Info("Replaying on-disk memory mappable chunks if any") |
| 710 | start := time.Now() |
| 711 | |
| 712 | snapIdx, snapOffset := -1, 0 |
| 713 | refSeries := make(map[chunks.HeadSeriesRef]*memSeries) |
| 714 | |
| 715 | snapshotLoaded := false |
| 716 | var chunkSnapshotLoadDuration time.Duration |
| 717 | if h.opts.EnableMemorySnapshotOnShutdown { |
| 718 | h.logger.Info("Chunk snapshot is enabled, replaying from the snapshot") |
| 719 | // If there are any WAL files, there should be at least one WAL file with an index that is current or newer |
| 720 | // than the snapshot index. If the WAL index is behind the snapshot index somehow, the snapshot is assumed |
| 721 | // to be outdated. |
| 722 | loadSnapshot := true |
| 723 | if h.wal != nil { |
| 724 | _, endAt, err := wlog.Segments(h.wal.Dir()) |
| 725 | if err != nil { |
| 726 | return fmt.Errorf("finding WAL segments: %w", err) |
| 727 | } |
| 728 | |
| 729 | _, idx, _, err := LastChunkSnapshot(h.opts.ChunkDirRoot) |
| 730 | if err != nil && !errors.Is(err, record.ErrNotFound) { |
| 731 | h.logger.Error("Could not find last snapshot", "err", err) |
| 732 | } |
| 733 | |
| 734 | if err == nil && endAt < idx { |
| 735 | loadSnapshot = false |
| 736 | h.logger.Warn("Last WAL file is behind snapshot, removing snapshots") |
| 737 | if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, math.MaxInt, math.MaxInt); err != nil { |
| 738 | h.logger.Error("Error while deleting snapshot directories", "err", err) |
| 739 | } |
| 740 | } |
| 741 | } |
| 742 | if loadSnapshot { |
| 743 | var err error |
| 744 | snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() |
| 745 | if err == nil { |
| 746 | snapshotLoaded = true |
| 747 | chunkSnapshotLoadDuration = time.Since(start) |
| 748 | h.logger.Info("Chunk snapshot loading time", "duration", chunkSnapshotLoadDuration.String()) |
| 749 | } else { |
| 750 | snapIdx, snapOffset = -1, 0 |
| 751 | refSeries = make(map[chunks.HeadSeriesRef]*memSeries) |