sendSamplesWithBackoff to the remote storage with backoff for recoverable errors.
(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type)
| 1748 | |
| 1749 | // sendSamplesWithBackoff to the remote storage with backoff for recoverable errors. |
| 1750 | func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf compression.EncodeBuffer, compr compression.Type) (WriteResponseStats, error) { |
| 1751 | // Build the WriteRequest with no metadata. |
| 1752 | req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, nil, buf, compr) |
| 1753 | s.qm.buildRequestLimitTimestamp.Store(lowest) |
| 1754 | if err != nil { |
| 1755 | // Failing to build the write request is non-recoverable, since it will |
| 1756 | // only error if marshaling the proto to bytes fails. |
| 1757 | return WriteResponseStats{}, err |
| 1758 | } |
| 1759 | |
| 1760 | reqSize := len(req) |
| 1761 | sc := sendBatchContext{ |
| 1762 | ctx: ctx, |
| 1763 | sampleCount: sampleCount, |
| 1764 | exemplarCount: exemplarCount, |
| 1765 | histogramCount: histogramCount, |
| 1766 | metadataCount: metadataCount, |
| 1767 | reqSize: reqSize, |
| 1768 | } |
| 1769 | |
| 1770 | metricsUpdater := batchMetricsUpdater{ |
| 1771 | metrics: s.qm.metrics, |
| 1772 | } |
| 1773 | |
| 1774 | // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need |
| 1775 | // to track the total amount of accepted data across the various attempts. |
| 1776 | accumulatedStats := WriteResponseStats{} |
| 1777 | var accumulatedStatsMu sync.Mutex |
| 1778 | addStats := func(rs WriteResponseStats) { |
| 1779 | accumulatedStatsMu.Lock() |
| 1780 | accumulatedStats = accumulatedStats.Add(rs) |
| 1781 | accumulatedStatsMu.Unlock() |
| 1782 | } |
| 1783 | |
| 1784 | // An anonymous function allows us to defer the completion of our per-try spans |
| 1785 | // without causing a memory leak, and it has the nice effect of not propagating any |
| 1786 | // parameters for sendSamplesWithBackoff/3. |
| 1787 | attemptStore := func(try int) error { |
| 1788 | currentTime := time.Now() |
| 1789 | lowest := s.qm.buildRequestLimitTimestamp.Load() |
| 1790 | if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { |
| 1791 | // This will filter out old samples during retries. |
| 1792 | req2, _, lowest, err := buildWriteRequest( |
| 1793 | s.qm.logger, |
| 1794 | samples, |
| 1795 | nil, |
| 1796 | pBuf, |
| 1797 | isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), |
| 1798 | buf, |
| 1799 | compr, |
| 1800 | ) |
| 1801 | s.qm.buildRequestLimitTimestamp.Store(lowest) |
| 1802 | if err != nil { |
| 1803 | return err |
| 1804 | } |
| 1805 | req = req2 |
| 1806 | } |
| 1807 |
no test coverage detected