(t *testing.T)
| 113 | } |
| 114 | |
| 115 | func TestJobExecutor_Execute(t *testing.T) { |
| 116 | t.Parallel() |
| 117 | |
| 118 | ctx := context.Background() |
| 119 | |
| 120 | type testBundle struct { |
| 121 | completer *jobcompleter.InlineCompleter |
| 122 | exec riverdriver.Executor |
| 123 | errorHandler *testErrorHandler |
| 124 | jobRow *rivertype.JobRow |
| 125 | updateCh <-chan []jobcompleter.CompleterJobUpdated |
| 126 | } |
| 127 | |
| 128 | setup := func(t *testing.T) (*JobExecutor, *testBundle) { |
| 129 | t.Helper() |
| 130 | |
| 131 | var ( |
| 132 | tx = riverdbtest.TestTxPgx(ctx, t) |
| 133 | archetype = riversharedtest.BaseServiceArchetype(t) |
| 134 | exec = riverpgxv5.New(nil).UnwrapExecutor(tx) |
| 135 | updateCh = make(chan []jobcompleter.CompleterJobUpdated, 10) |
| 136 | completer = jobcompleter.NewInlineCompleter(archetype, "", exec, &riverpilot.StandardPilot{}, updateCh) |
| 137 | ) |
| 138 | |
| 139 | t.Cleanup(completer.Stop) |
| 140 | |
| 141 | workUnitFactory := newWorkUnitFactoryWithCustomRetry(func() error { return nil }, nil) |
| 142 | |
| 143 | now := time.Now().UTC() |
| 144 | results, err := exec.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{ |
| 145 | Jobs: []*riverdriver.JobInsertFastParams{ |
| 146 | { |
| 147 | EncodedArgs: []byte("{}"), |
| 148 | Kind: "jobexecutor_test", |
| 149 | MaxAttempts: rivercommon.MaxAttemptsDefault, |
| 150 | Priority: rivercommon.PriorityDefault, |
| 151 | Queue: rivercommon.QueueDefault, |
| 152 | // Needs to be explicitly set to a "now" horizon that's aligned with the |
| 153 | // JobGetAvailable call. InsertMany applies a default scheduled_at in Go |
| 154 | // so it can't pick up the Postgres-level `now()` default. |
| 155 | ScheduledAt: ptrutil.Ptr(now), |
| 156 | State: rivertype.JobStateAvailable, |
| 157 | }, |
| 158 | }, |
| 159 | }) |
| 160 | require.NoError(t, err) |
| 161 | |
| 162 | // Fetch the job to make sure it's marked as running: |
| 163 | jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ |
| 164 | MaxToLock: 1, |
| 165 | Now: ptrutil.Ptr(now), |
| 166 | Queue: rivercommon.QueueDefault, |
| 167 | }) |
| 168 | require.NoError(t, err) |
| 169 | |
| 170 | require.Len(t, jobs, 1) |
| 171 | require.Equal(t, results[0].Job.ID, jobs[0].ID) |
| 172 | job := jobs[0] |
nothing calls this directly
no test coverage detected
searching dependent graphs…