Append queues a sample to be sent to the remote storage. Blocks until all samples are enqueued on their shards or a shutdown signal is received.
(samples []record.RefSample)
| 725 | // Append queues a sample to be sent to the remote storage. Blocks until all samples are |
| 726 | // enqueued on their shards or a shutdown signal is received. |
| 727 | func (t *QueueManager) Append(samples []record.RefSample) bool { |
| 728 | currentTime := time.Now() |
| 729 | outer: |
| 730 | for _, s := range samples { |
| 731 | if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) { |
| 732 | t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() |
| 733 | continue |
| 734 | } |
| 735 | t.seriesMtx.Lock() |
| 736 | lbls, ok := t.seriesLabels[s.Ref] |
| 737 | if !ok { |
| 738 | t.dataDropped.incr(1) |
| 739 | if _, ok := t.droppedSeries[s.Ref]; !ok { |
| 740 | t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) |
| 741 | t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() |
| 742 | } else { |
| 743 | t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc() |
| 744 | } |
| 745 | t.seriesMtx.Unlock() |
| 746 | continue |
| 747 | } |
| 748 | // TODO(cstyan): Handle or at least log an error if no metadata is found. |
| 749 | // See https://github.com/prometheus/prometheus/issues/14405 |
| 750 | meta := t.seriesMetadata[s.Ref] |
| 751 | t.seriesMtx.Unlock() |
| 752 | // Start with a very small backoff. This should not be t.cfg.MinBackoff |
| 753 | // as it can happen without errors, and we want to pickup work after |
| 754 | // filling a queue/resharding as quickly as possible. |
| 755 | // TODO: Consider using the average duration of a request as the backoff. |
| 756 | backoff := model.Duration(5 * time.Millisecond) |
| 757 | for { |
| 758 | select { |
| 759 | case <-t.quit: |
| 760 | return false |
| 761 | default: |
| 762 | } |
| 763 | if t.shards.enqueue(s.Ref, timeSeries{ |
| 764 | seriesLabels: lbls, |
| 765 | metadata: meta, |
| 766 | startTimestamp: s.ST, |
| 767 | timestamp: s.T, |
| 768 | value: s.V, |
| 769 | sType: tSample, |
| 770 | }) { |
| 771 | continue outer |
| 772 | } |
| 773 | |
| 774 | t.metrics.enqueueRetriesTotal.Inc() |
| 775 | time.Sleep(time.Duration(backoff)) |
| 776 | backoff *= 2 |
| 777 | // It is reasonable to use t.cfg.MaxBackoff here, as if we have hit |
| 778 | // the full backoff we are likely waiting for external resources. |
| 779 | if backoff > t.cfg.MaxBackoff { |
| 780 | backoff = t.cfg.MaxBackoff |
| 781 | } |
| 782 | } |
| 783 | } |
| 784 | return true |