MCPcopy
hub / github.com/prometheus/prometheus / Append

Method Append

storage/remote/queue_manager.go:727–785  ·  view source on GitHub ↗

Append queues a sample to be sent to the remote storage. Blocks until all samples are enqueued on their shards or a shutdown signal is received.

(samples []record.RefSample)

Source from the content-addressed store, hash-verified

725// Append queues a sample to be sent to the remote storage. Blocks until all samples are
726// enqueued on their shards or a shutdown signal is received.
727func (t *QueueManager) Append(samples []record.RefSample) bool {
728 currentTime := time.Now()
729outer:
730 for _, s := range samples {
731 if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
732 t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
733 continue
734 }
735 t.seriesMtx.Lock()
736 lbls, ok := t.seriesLabels[s.Ref]
737 if !ok {
738 t.dataDropped.incr(1)
739 if _, ok := t.droppedSeries[s.Ref]; !ok {
740 t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
741 t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
742 } else {
743 t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc()
744 }
745 t.seriesMtx.Unlock()
746 continue
747 }
748 // TODO(cstyan): Handle or at least log an error if no metadata is found.
749 // See https://github.com/prometheus/prometheus/issues/14405
750 meta := t.seriesMetadata[s.Ref]
751 t.seriesMtx.Unlock()
752 // Start with a very small backoff. This should not be t.cfg.MinBackoff
753 // as it can happen without errors, and we want to pickup work after
754 // filling a queue/resharding as quickly as possible.
755 // TODO: Consider using the average duration of a request as the backoff.
756 backoff := model.Duration(5 * time.Millisecond)
757 for {
758 select {
759 case <-t.quit:
760 return false
761 default:
762 }
763 if t.shards.enqueue(s.Ref, timeSeries{
764 seriesLabels: lbls,
765 metadata: meta,
766 startTimestamp: s.ST,
767 timestamp: s.T,
768 value: s.V,
769 sType: tSample,
770 }) {
771 continue outer
772 }
773
774 t.metrics.enqueueRetriesTotal.Inc()
775 time.Sleep(time.Duration(backoff))
776 backoff *= 2
777 // It is reasonable to use t.cfg.MaxBackoff here, as if we have hit
778 // the full backoff we are likely waiting for external resources.
779 if backoff > t.cfg.MaxBackoff {
780 backoff = t.cfg.MaxBackoff
781 }
782 }
783 }
784 return true

Callers 1

Calls 6

isSampleOldFunction · 0.85
DurationMethod · 0.80
IncMethod · 0.80
LockMethod · 0.80
incrMethod · 0.80
enqueueMethod · 0.45

Tested by 1