MCPcopy
hub / github.com/prometheus/prometheus / PopulateBlock

Method PopulateBlock

tsdb/compact.go:793–939  ·  view source on GitHub ↗

PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint.

(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc)

Source from the content-addressed store, hash-verified

791// of the provided blocks. It returns meta information for the new block.
792// It expects sorted blocks input by mint.
793func (DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) {
794 if len(blocks) == 0 {
795 return errors.New("cannot populate block from no readers")
796 }
797
798 var (
799 sets []storage.ChunkSeriesSet
800 symbols index.StringIter
801 closers []io.Closer
802 overlapping bool
803 )
804 defer func() {
805 if cerr := closeAll(closers); cerr != nil {
806 err = errors.Join(err, fmt.Errorf("close: %w", cerr))
807 }
808 metrics.PopulatingBlocks.Set(0)
809 }()
810 metrics.PopulatingBlocks.Set(1)
811
812 globalMaxt := blocks[0].Meta().MaxTime
813 for i, b := range blocks {
814 select {
815 case <-ctx.Done():
816 return ctx.Err()
817 default:
818 }
819
820 if !overlapping {
821 if i > 0 && b.Meta().MinTime < globalMaxt {
822 metrics.OverlappingBlocks.Inc()
823 overlapping = true
824 logger.Info("Found overlapping blocks during compaction", "ulid", meta.ULID)
825 }
826 if b.Meta().MaxTime > globalMaxt {
827 globalMaxt = b.Meta().MaxTime
828 }
829 }
830
831 indexr, err := b.Index()
832 if err != nil {
833 return fmt.Errorf("open index reader for block %+v: %w", b.Meta(), err)
834 }
835 closers = append(closers, indexr)
836
837 chunkr, err := b.Chunks()
838 if err != nil {
839 return fmt.Errorf("open chunk reader for block %+v: %w", b.Meta(), err)
840 }
841 closers = append(closers, chunkr)
842
843 tombsr, err := b.Tombstones()
844 if err != nil {
845 return fmt.Errorf("open tombstone reader for block %+v: %w", b.Meta(), err)
846 }
847 closers = append(closers, tombsr)
848
849 postings := postingsFunc(ctx, indexr)
850 // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.

Callers 1

Calls 15

SeriesRefTypeAlias · 0.92
NewMergeChunkSeriesSetFunction · 0.92
NewBlockChunkSeriesSetFunction · 0.85
NewMergedStringIterFunction · 0.85
IncMethod · 0.80
closeAllFunction · 0.70
SetMethod · 0.65
MetaMethod · 0.65
DoneMethod · 0.65
ErrMethod · 0.65
IndexMethod · 0.65
ChunksMethod · 0.65

Tested by 1