MCPcopy
hub / github.com/prometheus/prometheus / processWALSamples

Method processWALSamples

tsdb/head_wal.go:667–795  ·  view source on GitHub ↗

processWALSamples adds the samples it receives to the head and passes the buffer received to an output channel for reuse. Samples before the minValidTime timestamp are discarded.

(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk)

Source from the content-addressed store, hash-verified

665// the buffer received to an output channel for reuse.
666// Samples before the minValidTime timestamp are discarded.
667func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) {
668 defer close(wp.output)
669 defer close(wp.histogramsOutput)
670
671 missingSeries := make(map[chunks.HeadSeriesRef]struct{})
672 var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64
673
674 minValidTime := h.minValidTime.Load()
675 mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
676 // storeST must be passed here so that appendPreprocessor cuts an in-progress
677 // XOR chunk immediately when replaying into a head with ST storage enabled.
678 // XOR chunks cannot store start timestamps and must not be continued with
679 // XOR2 appends when ST storage is active.
680 appendChunkOpts := chunkOpts{
681 chunkDiskMapper: h.chunkDiskMapper,
682 chunkRange: h.chunkRange.Load(),
683 samplesPerChunk: h.opts.SamplesPerChunk,
684 useXOR2: h.opts.UseXOR2FloatEncoding(),
685 storeST: h.opts.EnableSTStorage.Load(),
686 }
687
688 for in := range wp.input {
689 if in.existingSeries != nil {
690 mmc := mmappedChunks[in.walSeriesRef]
691 oooMmc := oooMmappedChunks[in.walSeriesRef]
692 if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) {
693 mmapOverlappingChunks++
694 }
695 continue
696 }
697
698 for _, s := range in.samples {
699 ms := h.series.getByID(s.Ref)
700 if ms == nil {
701 unknownSampleRefs++
702 missingSeries[s.Ref] = struct{}{}
703 continue
704 }
705 if s.T <= ms.mmMaxTime {
706 continue
707 }
708
709 if !value.IsStaleNaN(ms.lastValue) && value.IsStaleNaN(s.V) {
710 h.numStaleSeries.Inc()
711 }
712 if value.IsStaleNaN(ms.lastValue) && !value.IsStaleNaN(s.V) {
713 h.numStaleSeries.Dec()
714 }
715
716 h.appendChunkAndMmap(ms, func() bool {
717 _, chunkCreated := ms.append(s.ST, s.T, s.V, 0, appendChunkOpts)
718 return chunkCreated
719 })
720 if s.T > maxt {
721 maxt = s.T
722 }
723 if s.T < mint {
724 mint = s.T

Callers 1

loadWALMethod · 0.80

Calls 13

IsStaleNaNFunction · 0.92
UseXOR2FloatEncodingMethod · 0.80
getByIDMethod · 0.80
IncMethod · 0.80
DecMethod · 0.80
appendChunkAndMmapMethod · 0.80
deleteSeriesByIDMethod · 0.80
updateMinMaxTimeMethod · 0.80
LoadMethod · 0.65
appendMethod · 0.65
appendHistogramMethod · 0.45

Tested by

no test coverage detected