MCPcopy
hub / github.com/riverqueue/river / Test_Client_RetryPolicy

Function Test_Client_RetryPolicy

client_test.go:6756–6877  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

6754}
6755
6756func Test_Client_RetryPolicy(t *testing.T) {
6757 t.Parallel()
6758
6759 ctx := context.Background()
6760
6761 t.Run("RetryUntilDiscarded", func(t *testing.T) {
6762 t.Parallel()
6763
6764 var (
6765 dbPool = riversharedtest.DBPool(ctx, t)
6766 driver = riverpgxv5.New(dbPool)
6767 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
6768 config = newTestConfig(t, schema)
6769 )
6770
6771 // The default policy would work too, but this takes some variability
6772 // out of it to make comparisons easier.
6773 config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{}
6774
6775 type JobArgs struct {
6776 testutil.JobArgsReflectKind[JobArgs]
6777 }
6778
6779 AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
6780 return errors.New("job error")
6781 }))
6782
6783 client := newTestClient(t, dbPool, config)
6784
6785 now := client.baseService.Time.StubNow(time.Now().UTC())
6786 t.Logf("Now: %s", now)
6787
6788 subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed)
6789 t.Cleanup(cancel)
6790
6791 originalJobs := make([]*rivertype.JobRow, rivercommon.MaxAttemptsDefault)
6792 for i := range originalJobs {
6793 insertRes, err := client.Insert(ctx, JobArgs{}, nil)
6794 require.NoError(t, err)
6795
6796 // regression protection to ensure we're testing the right number of jobs:
6797 require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts)
6798
6799 updatedJob, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
6800 ID: insertRes.Job.ID,
6801 AttemptedAtDoUpdate: true,
6802 AttemptedAt: &now, // we want a value here, but it'll be overwritten as jobs are locked by the producer
6803 AttemptDoUpdate: true,
6804 Attempt: i, // starts at i, but will be i + 1 by the time it's being worked
6805 Schema: schema,
6806
6807 // Need to find a cleaner way around this, but state is required
6808 // because sqlc can't encode an empty string to the
6809 // corresponding enum. This value is not actually used because
6810 // StateDoUpdate was not supplied.
6811 State: rivertype.JobStateAvailable,
6812 })
6813 require.NoError(t, err)

Callers

nothing calls this directly

Calls 15

DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
WaitOrTimeoutFunction · 0.92
MapFunction · 0.92
KeyByFunction · 0.92
AddWorkerFunction · 0.85
WorkFuncFunction · 0.85
newTestClientFunction · 0.85
SubscribeMethod · 0.80
CleanupMethod · 0.80
InsertMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…