MCPcopy
hub / github.com/prometheus/prometheus / sendSamplesWithBackoff

Method sendSamplesWithBackoff

storage/remote/queue_manager.go:1750–1852  ·  view source on GitHub ↗

sendSamplesWithBackoff to the remote storage with backoff for recoverable errors.

(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type)

Source from the content-addressed store, hash-verified

1748
1749// sendSamplesWithBackoff to the remote storage with backoff for recoverable errors.
1750func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) {
1751 // Build the WriteRequest with no metadata.
1752 req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, nil, buf, compr)
1753 s.qm.buildRequestLimitTimestamp.Store(lowest)
1754 if err != nil {
1755 // Failing to build the write request is non-recoverable, since it will
1756 // only error if marshaling the proto to bytes fails.
1757 return WriteResponseStats{}, err
1758 }
1759
1760 reqSize := len(req)
1761 sc := sendBatchContext{
1762 ctx: ctx,
1763 sampleCount: sampleCount,
1764 exemplarCount: exemplarCount,
1765 histogramCount: histogramCount,
1766 metadataCount: metadataCount,
1767 reqSize: reqSize,
1768 }
1769
1770 metricsUpdater := batchMetricsUpdater{
1771 metrics: s.qm.metrics,
1772 }
1773
1774 // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need
1775 // to track the total amount of accepted data across the various attempts.
1776 accumulatedStats := WriteResponseStats{}
1777 var accumulatedStatsMu sync.Mutex
1778 addStats := func(rs WriteResponseStats) {
1779 accumulatedStatsMu.Lock()
1780 accumulatedStats = accumulatedStats.Add(rs)
1781 accumulatedStatsMu.Unlock()
1782 }
1783
1784 // An anonymous function allows us to defer the completion of our per-try spans
1785 // without causing a memory leak, and it has the nice effect of not propagating any
1786 // parameters for sendSamplesWithBackoff/3.
1787 attemptStore := func(try int) error {
1788 currentTime := time.Now()
1789 lowest := s.qm.buildRequestLimitTimestamp.Load()
1790 if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
1791 // This will filter out old samples during retries.
1792 req2, _, lowest, err := buildWriteRequest(
1793 s.qm.logger,
1794 samples,
1795 nil,
1796 pBuf,
1797 isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
1798 buf,
1799 compr,
1800 )
1801 s.qm.buildRequestLimitTimestamp.Store(lowest)
1802 if err != nil {
1803 return err
1804 }
1805 req = req2
1806 }
1807

Callers 1

sendSamplesMethod · 0.95

Calls 15

AddMethod · 0.95
recordBatchAttemptMethod · 0.95
recordLatencyMethod · 0.95
recordRetryMethod · 0.95
buildWriteRequestFunction · 0.85
isSampleOldFunction · 0.85
isTimeSeriesOldFilterFunction · 0.85
createBatchSpanFunction · 0.85
LockMethod · 0.80
DurationMethod · 0.80
clientMethod · 0.80

Tested by

no test coverage detected