MCPcopy Index your code
hub / github.com/riverqueue/river / TestProducer_WithNotifier

Function TestProducer_WithNotifier

producer_test.go:217–269  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

215}
216
217func TestProducer_WithNotifier(t *testing.T) {
218 t.Parallel()
219
220 testProducer(t, func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated) {
221 t.Helper()
222
223 var (
224 archetype = riversharedtest.BaseServiceArchetype(t)
225 dbPool = riversharedtest.DBPool(ctx, t)
226 driver = riverpgxv5.New(dbPool)
227 exec = driver.GetExecutor()
228 jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10)
229 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
230 listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
231 pilot = &riverpilot.StandardPilot{}
232 queueName = fmt.Sprintf("test-producer-with-notifier-%05d", randutil.IntBetween(1, 100_000))
233 )
234
235 completer := jobcompleter.NewInlineCompleter(archetype, schema, exec, &riverpilot.StandardPilot{}, jobUpdates)
236 {
237 require.NoError(t, completer.Start(ctx))
238 t.Cleanup(completer.Stop)
239 }
240
241 notifier := notifier.New(archetype, listener)
242 {
243 require.NoError(t, notifier.Start(ctx))
244 t.Cleanup(notifier.Stop)
245 }
246
247 return newProducer(archetype, exec, pilot, &producerConfig{
248 ClientID: testClientID,
249 Completer: completer,
250 ErrorHandler: newTestErrorHandler(),
251 FetchCooldown: FetchCooldownDefault,
252 FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal so in case we miss the event, tests still pass quickly
253 HookLookupByJob: hooklookup.NewJobHookLookup(),
254 HookLookupGlobal: hooklookup.NewHookLookup(nil),
255 JobTimeout: JobTimeoutDefault,
256 MaxWorkers: 1_000,
257 MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
258 Notifier: notifier,
259 Queue: queueName,
260 QueuePollInterval: queuePollIntervalDefault,
261 QueueReportInterval: queueReportIntervalDefault,
262 RetryPolicy: &DefaultClientRetryPolicy{},
263 SchedulerInterval: riverinternaltest.SchedulerShortInterval,
264 Schema: schema,
265 StaleProducerRetentionPeriod: time.Minute,
266 Workers: NewWorkers(),
267 }), jobUpdates
268 })
269}
270
271func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated)) {
272 t.Helper()

Callers

nothing calls this directly

Calls 15

StartMethod · 0.95
BaseServiceArchetypeFunction · 0.92
DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
IntBetweenFunction · 0.92
NewInlineCompleterFunction · 0.92
NewFunction · 0.92
NewJobHookLookupFunction · 0.92
NewHookLookupFunction · 0.92
NewMiddlewareLookupFunction · 0.92
testProducerFunction · 0.85

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…