processWALSamples adds the samples it receives to the head and passes the buffer received to an output channel for reuse. Samples before the minValidTime timestamp are discarded.
(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk)
| 665 | // the buffer received to an output channel for reuse. |
| 666 | // Samples before the minValidTime timestamp are discarded. |
| 667 | func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) { |
| 668 | defer close(wp.output) |
| 669 | defer close(wp.histogramsOutput) |
| 670 | |
| 671 | missingSeries := make(map[chunks.HeadSeriesRef]struct{}) |
| 672 | var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64 |
| 673 | |
| 674 | minValidTime := h.minValidTime.Load() |
| 675 | mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) |
| 676 | // storeST must be passed here so that appendPreprocessor cuts an in-progress |
| 677 | // XOR chunk immediately when replaying into a head with ST storage enabled. |
| 678 | // XOR chunks cannot store start timestamps and must not be continued with |
| 679 | // XOR2 appends when ST storage is active. |
| 680 | appendChunkOpts := chunkOpts{ |
| 681 | chunkDiskMapper: h.chunkDiskMapper, |
| 682 | chunkRange: h.chunkRange.Load(), |
| 683 | samplesPerChunk: h.opts.SamplesPerChunk, |
| 684 | useXOR2: h.opts.UseXOR2FloatEncoding(), |
| 685 | storeST: h.opts.EnableSTStorage.Load(), |
| 686 | } |
| 687 | |
| 688 | for in := range wp.input { |
| 689 | if in.existingSeries != nil { |
| 690 | mmc := mmappedChunks[in.walSeriesRef] |
| 691 | oooMmc := oooMmappedChunks[in.walSeriesRef] |
| 692 | if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) { |
| 693 | mmapOverlappingChunks++ |
| 694 | } |
| 695 | continue |
| 696 | } |
| 697 | |
| 698 | for _, s := range in.samples { |
| 699 | ms := h.series.getByID(s.Ref) |
| 700 | if ms == nil { |
| 701 | unknownSampleRefs++ |
| 702 | missingSeries[s.Ref] = struct{}{} |
| 703 | continue |
| 704 | } |
| 705 | if s.T <= ms.mmMaxTime { |
| 706 | continue |
| 707 | } |
| 708 | |
| 709 | if !value.IsStaleNaN(ms.lastValue) && value.IsStaleNaN(s.V) { |
| 710 | h.numStaleSeries.Inc() |
| 711 | } |
| 712 | if value.IsStaleNaN(ms.lastValue) && !value.IsStaleNaN(s.V) { |
| 713 | h.numStaleSeries.Dec() |
| 714 | } |
| 715 | |
| 716 | h.appendChunkAndMmap(ms, func() bool { |
| 717 | _, chunkCreated := ms.append(s.ST, s.T, s.V, 0, appendChunkOpts) |
| 718 | return chunkCreated |
| 719 | }) |
| 720 | if s.T > maxt { |
| 721 | maxt = s.T |
| 722 | } |
| 723 | if s.T < mint { |
| 724 | mint = s.T |
no test coverage detected