(t *testing.T)
| 6754 | } |
| 6755 | |
| 6756 | func Test_Client_RetryPolicy(t *testing.T) { |
| 6757 | t.Parallel() |
| 6758 | |
| 6759 | ctx := context.Background() |
| 6760 | |
| 6761 | t.Run("RetryUntilDiscarded", func(t *testing.T) { |
| 6762 | t.Parallel() |
| 6763 | |
| 6764 | var ( |
| 6765 | dbPool = riversharedtest.DBPool(ctx, t) |
| 6766 | driver = riverpgxv5.New(dbPool) |
| 6767 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 6768 | config = newTestConfig(t, schema) |
| 6769 | ) |
| 6770 | |
| 6771 | // The default policy would work too, but this takes some variability |
| 6772 | // out of it to make comparisons easier. |
| 6773 | config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{} |
| 6774 | |
| 6775 | type JobArgs struct { |
| 6776 | testutil.JobArgsReflectKind[JobArgs] |
| 6777 | } |
| 6778 | |
| 6779 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 6780 | return errors.New("job error") |
| 6781 | })) |
| 6782 | |
| 6783 | client := newTestClient(t, dbPool, config) |
| 6784 | |
| 6785 | now := client.baseService.Time.StubNow(time.Now().UTC()) |
| 6786 | t.Logf("Now: %s", now) |
| 6787 | |
| 6788 | subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed) |
| 6789 | t.Cleanup(cancel) |
| 6790 | |
| 6791 | originalJobs := make([]*rivertype.JobRow, rivercommon.MaxAttemptsDefault) |
| 6792 | for i := range originalJobs { |
| 6793 | insertRes, err := client.Insert(ctx, JobArgs{}, nil) |
| 6794 | require.NoError(t, err) |
| 6795 | |
| 6796 | // regression protection to ensure we're testing the right number of jobs: |
| 6797 | require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts) |
| 6798 | |
| 6799 | updatedJob, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ |
| 6800 | ID: insertRes.Job.ID, |
| 6801 | AttemptedAtDoUpdate: true, |
| 6802 | AttemptedAt: &now, // we want a value here, but it'll be overwritten as jobs are locked by the producer |
| 6803 | AttemptDoUpdate: true, |
| 6804 | Attempt: i, // starts at i, but will be i + 1 by the time it's being worked |
| 6805 | Schema: schema, |
| 6806 | |
| 6807 | // Need to find a cleaner way around this, but state is required |
| 6808 | // because sqlc can't encode an empty string to the |
| 6809 | // corresponding enum. This value is not actually used because |
| 6810 | // StateDoUpdate was not supplied. |
| 6811 | State: rivertype.JobStateAvailable, |
| 6812 | }) |
| 6813 | require.NoError(t, err) |
nothing calls this directly
no test coverage detected
searching dependent graphs…