PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint.
(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc)
| 791 | // of the provided blocks. It returns meta information for the new block. |
| 792 | // It expects sorted blocks input by mint. |
| 793 | func (DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger *slog.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) { |
| 794 | if len(blocks) == 0 { |
| 795 | return errors.New("cannot populate block from no readers") |
| 796 | } |
| 797 | |
| 798 | var ( |
| 799 | sets []storage.ChunkSeriesSet |
| 800 | symbols index.StringIter |
| 801 | closers []io.Closer |
| 802 | overlapping bool |
| 803 | ) |
| 804 | defer func() { |
| 805 | if cerr := closeAll(closers); cerr != nil { |
| 806 | err = errors.Join(err, fmt.Errorf("close: %w", cerr)) |
| 807 | } |
| 808 | metrics.PopulatingBlocks.Set(0) |
| 809 | }() |
| 810 | metrics.PopulatingBlocks.Set(1) |
| 811 | |
| 812 | globalMaxt := blocks[0].Meta().MaxTime |
| 813 | for i, b := range blocks { |
| 814 | select { |
| 815 | case <-ctx.Done(): |
| 816 | return ctx.Err() |
| 817 | default: |
| 818 | } |
| 819 | |
| 820 | if !overlapping { |
| 821 | if i > 0 && b.Meta().MinTime < globalMaxt { |
| 822 | metrics.OverlappingBlocks.Inc() |
| 823 | overlapping = true |
| 824 | logger.Info("Found overlapping blocks during compaction", "ulid", meta.ULID) |
| 825 | } |
| 826 | if b.Meta().MaxTime > globalMaxt { |
| 827 | globalMaxt = b.Meta().MaxTime |
| 828 | } |
| 829 | } |
| 830 | |
| 831 | indexr, err := b.Index() |
| 832 | if err != nil { |
| 833 | return fmt.Errorf("open index reader for block %+v: %w", b.Meta(), err) |
| 834 | } |
| 835 | closers = append(closers, indexr) |
| 836 | |
| 837 | chunkr, err := b.Chunks() |
| 838 | if err != nil { |
| 839 | return fmt.Errorf("open chunk reader for block %+v: %w", b.Meta(), err) |
| 840 | } |
| 841 | closers = append(closers, chunkr) |
| 842 | |
| 843 | tombsr, err := b.Tombstones() |
| 844 | if err != nil { |
| 845 | return fmt.Errorf("open tombstone reader for block %+v: %w", b.Meta(), err) |
| 846 | } |
| 847 | closers = append(closers, tombsr) |
| 848 | |
| 849 | postings := postingsFunc(ctx, indexr) |
| 850 | // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. |