TestDisableReshardOnRetry asserts that resharding should be disabled when a recoverable error is returned from remote_write.
(t *testing.T)
| 764 | // TestDisableReshardOnRetry asserts that resharding should be disabled when a |
| 765 | // recoverable error is returned from remote_write. |
| 766 | func TestDisableReshardOnRetry(t *testing.T) { |
| 767 | t.Parallel() |
| 768 | onStoredContext, onStoreCalled := context.WithCancel(context.Background()) |
| 769 | defer onStoreCalled() |
| 770 | |
| 771 | var ( |
| 772 | recs = testwal.GenerateRecords(recCase{Series: 100, SamplesPerSeries: 100}) |
| 773 | |
| 774 | cfg = config.DefaultQueueConfig |
| 775 | mcfg = config.DefaultMetadataConfig |
| 776 | retryAfter = time.Second |
| 777 | |
| 778 | metrics = newQueueManagerMetrics(nil, "", "") |
| 779 | |
| 780 | client = &MockWriteClient{ |
| 781 | StoreFunc: func(context.Context, []byte, int) (WriteResponseStats, error) { |
| 782 | onStoreCalled() |
| 783 | |
| 784 | return WriteResponseStats{}, RecoverableError{ |
| 785 | error: errors.New("fake error"), |
| 786 | retryAfter: model.Duration(retryAfter), |
| 787 | } |
| 788 | }, |
| 789 | NameFunc: func() string { return "mock" }, |
| 790 | EndpointFunc: func() string { return "http://fake:9090/api/v1/write" }, |
| 791 | } |
| 792 | ) |
| 793 | |
| 794 | m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, nil) |
| 795 | m.StoreSeries(recs.Series, 0) |
| 796 | |
| 797 | // Attempt to samples while the manager is running. We immediately stop the |
| 798 | // manager after the recoverable error is generated to prevent the manager |
| 799 | // from resharding itself. |
| 800 | m.Start() |
| 801 | { |
| 802 | m.Append(recs.Samples) |
| 803 | |
| 804 | select { |
| 805 | case <-onStoredContext.Done(): |
| 806 | case <-time.After(time.Minute): |
| 807 | require.FailNow(t, "timed out waiting for client to be sent metrics") |
| 808 | } |
| 809 | } |
| 810 | m.Stop() |
| 811 | |
| 812 | require.Eventually(t, func() bool { |
| 813 | // Force m.lastSendTimestamp to be current so the last send timestamp isn't |
| 814 | // the reason resharding is disabled. |
| 815 | m.lastSendTimestamp.Store(time.Now().Unix()) |
| 816 | return m.shouldReshard(m.numShards+1) == false |
| 817 | }, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled") |
| 818 | |
| 819 | // After 2x retryAfter, resharding should be enabled again. |
| 820 | require.Eventually(t, func() bool { |
| 821 | // Force m.lastSendTimestamp to be current so the last send timestamp isn't |
| 822 | // the reason resharding is disabled. |
| 823 | m.lastSendTimestamp.Store(time.Now().Unix()) |
nothing calls this directly
no test coverage detected
searching dependent graphs…