MCPcopy
hub / github.com/prometheus/prometheus / TestDisableReshardOnRetry

Function TestDisableReshardOnRetry

storage/remote/queue_manager_test.go:766–826  ·  view source on GitHub ↗

TestDisableReshardOnRetry asserts that resharding should be disabled when a recoverable error is returned from remote_write.

(t *testing.T)

Source from the content-addressed store, hash-verified

764// TestDisableReshardOnRetry asserts that resharding should be disabled when a
765// recoverable error is returned from remote_write.
766func TestDisableReshardOnRetry(t *testing.T) {
767 t.Parallel()
768 onStoredContext, onStoreCalled := context.WithCancel(context.Background())
769 defer onStoreCalled()
770
771 var (
772 recs = testwal.GenerateRecords(recCase{Series: 100, SamplesPerSeries: 100})
773
774 cfg = config.DefaultQueueConfig
775 mcfg = config.DefaultMetadataConfig
776 retryAfter = time.Second
777
778 metrics = newQueueManagerMetrics(nil, "", "")
779
780 client = &MockWriteClient{
781 StoreFunc: func(context.Context, []byte, int) (WriteResponseStats, error) {
782 onStoreCalled()
783
784 return WriteResponseStats{}, RecoverableError{
785 error: errors.New("fake error"),
786 retryAfter: model.Duration(retryAfter),
787 }
788 },
789 NameFunc: func() string { return "mock" },
790 EndpointFunc: func() string { return "http://fake:9090/api/v1/write" },
791 }
792 )
793
794 m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, nil)
795 m.StoreSeries(recs.Series, 0)
796
797 // Attempt to samples while the manager is running. We immediately stop the
798 // manager after the recoverable error is generated to prevent the manager
799 // from resharding itself.
800 m.Start()
801 {
802 m.Append(recs.Samples)
803
804 select {
805 case <-onStoredContext.Done():
806 case <-time.After(time.Minute):
807 require.FailNow(t, "timed out waiting for client to be sent metrics")
808 }
809 }
810 m.Stop()
811
812 require.Eventually(t, func() bool {
813 // Force m.lastSendTimestamp to be current so the last send timestamp isn't
814 // the reason resharding is disabled.
815 m.lastSendTimestamp.Store(time.Now().Unix())
816 return m.shouldReshard(m.numShards+1) == false
817 }, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled")
818
819 // After 2x retryAfter, resharding should be enabled again.
820 require.Eventually(t, func() bool {
821 // Force m.lastSendTimestamp to be current so the last send timestamp isn't
822 // the reason resharding is disabled.
823 m.lastSendTimestamp.Store(time.Now().Unix())

Callers

nothing calls this directly

Calls 15

StoreSeriesMethod · 0.95
StartMethod · 0.95
AppendMethod · 0.95
StopMethod · 0.95
shouldReshardMethod · 0.95
GenerateRecordsFunction · 0.92
EmptyLabelsFunction · 0.92
newQueueManagerMetricsFunction · 0.85
NewQueueManagerFunction · 0.85
newEWMARateFunction · 0.85
newPoolFunction · 0.85

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…