MCPcopy Index your code
hub / github.com/riverqueue/river / testProducer

Function testProducer

producer_test.go:271–796  ·  view source on GitHub ↗
(t *testing.T, makeProducer func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated))

Source from the content-addressed store, hash-verified

269}
270
271func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated)) {
272 t.Helper()
273
274 ctx := context.Background()
275
276 type testBundle struct {
277 archetype *baseservice.Archetype
278 completer jobcompleter.JobCompleter
279 config *Config
280 exec riverdriver.Executor
281 jobUpdates chan jobcompleter.CompleterJobUpdated
282 queue string
283 timeBeforeStart time.Time
284 workers *Workers
285 }
286
287 setup := func(t *testing.T) (*producer, *testBundle) {
288 t.Helper()
289
290 timeBeforeStart := time.Now().UTC()
291
292 producer, jobUpdates := makeProducer(ctx, t)
293 producer.testSignals.Init(t)
294 config := newTestConfig(t, producer.config.Schema)
295
296 jobUpdatesFlattened := make(chan jobcompleter.CompleterJobUpdated, 10)
297 go func() {
298 for updates := range jobUpdates {
299 for _, update := range updates {
300 jobUpdatesFlattened <- update
301 }
302 }
303 }()
304
305 return producer, &testBundle{
306 archetype: &producer.Archetype,
307 completer: producer.completer,
308 config: config,
309 exec: producer.exec,
310 jobUpdates: jobUpdatesFlattened,
311 queue: producer.config.Queue,
312 timeBeforeStart: timeBeforeStart,
313 workers: producer.workers,
314 }
315 }
316
317 mustInsert := func(ctx context.Context, t *testing.T, producer *producer, bundle *testBundle, args JobArgs) {
318 t.Helper()
319
320 insertParams, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, &InsertOpts{
321 Queue: bundle.queue,
322 })
323 require.NoError(t, err)
324 if insertParams.ScheduledAt == nil {
325 // Without this, newly inserted jobs will pick up a scheduled_at time
326 // that's the current Go time at the time of insertion. If the test is
327 // using a transaction, this will be after the `now()` time in the
328 // transaction that gets used by default in `JobGetAvailable`, so new jobs

Callers 2

TestProducer_PollOnlyFunction · 0.85

Calls 15

WaitOrTimeoutFunction · 0.92
WaitOrTimeoutNFunction · 0.92
LoggerWarnFunction · 0.92
StressFunction · 0.92
QueueFunction · 0.92
PtrFunction · 0.92
AddWorkerFunction · 0.85
WorkFuncFunction · 0.85
emitQueueNotificationFunction · 0.85
StartWorkContextMethod · 0.80
CleanupMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…