MCPcopy Index your code
hub / github.com/riverqueue/river / TestJobExecutor_Execute

Function TestJobExecutor_Execute

internal/jobexecutor/job_executor_test.go:115–1171  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

113}
114
115func TestJobExecutor_Execute(t *testing.T) {
116 t.Parallel()
117
118 ctx := context.Background()
119
120 type testBundle struct {
121 completer *jobcompleter.InlineCompleter
122 exec riverdriver.Executor
123 errorHandler *testErrorHandler
124 jobRow *rivertype.JobRow
125 updateCh <-chan []jobcompleter.CompleterJobUpdated
126 }
127
128 setup := func(t *testing.T) (*JobExecutor, *testBundle) {
129 t.Helper()
130
131 var (
132 tx = riverdbtest.TestTxPgx(ctx, t)
133 archetype = riversharedtest.BaseServiceArchetype(t)
134 exec = riverpgxv5.New(nil).UnwrapExecutor(tx)
135 updateCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
136 completer = jobcompleter.NewInlineCompleter(archetype, "", exec, &riverpilot.StandardPilot{}, updateCh)
137 )
138
139 t.Cleanup(completer.Stop)
140
141 workUnitFactory := newWorkUnitFactoryWithCustomRetry(func() error { return nil }, nil)
142
143 now := time.Now().UTC()
144 results, err := exec.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{
145 Jobs: []*riverdriver.JobInsertFastParams{
146 {
147 EncodedArgs: []byte("{}"),
148 Kind: "jobexecutor_test",
149 MaxAttempts: rivercommon.MaxAttemptsDefault,
150 Priority: rivercommon.PriorityDefault,
151 Queue: rivercommon.QueueDefault,
152 // Needs to be explicitly set to a "now" horizon that's aligned with the
153 // JobGetAvailable call. InsertMany applies a default scheduled_at in Go
154 // so it can't pick up the Postgres-level `now()` default.
155 ScheduledAt: ptrutil.Ptr(now),
156 State: rivertype.JobStateAvailable,
157 },
158 },
159 })
160 require.NoError(t, err)
161
162 // Fetch the job to make sure it's marked as running:
163 jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
164 MaxToLock: 1,
165 Now: ptrutil.Ptr(now),
166 Queue: rivercommon.QueueDefault,
167 })
168 require.NoError(t, err)
169
170 require.Len(t, jobs, 1)
171 require.Equal(t, results[0].Job.ID, jobs[0].ID)
172 job := jobs[0]

Callers

nothing calls this directly

Calls 15

TestTxPgxFunction · 0.92
BaseServiceArchetypeFunction · 0.92
NewFunction · 0.92
NewInlineCompleterFunction · 0.92
PtrFunction · 0.92
InitFunction · 0.92
NewJobHookLookupFunction · 0.92
NewHookLookupFunction · 0.92
NewMiddlewareLookupFunction · 0.92
WaitOrTimeoutFunction · 0.92
JobCancelFunction · 0.92
Job_BuildFunction · 0.92

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…