(t *testing.T)
| 215 | } |
| 216 | |
| 217 | func TestProducer_WithNotifier(t *testing.T) { |
| 218 | t.Parallel() |
| 219 | |
| 220 | testProducer(t, func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated) { |
| 221 | t.Helper() |
| 222 | |
| 223 | var ( |
| 224 | archetype = riversharedtest.BaseServiceArchetype(t) |
| 225 | dbPool = riversharedtest.DBPool(ctx, t) |
| 226 | driver = riverpgxv5.New(dbPool) |
| 227 | exec = driver.GetExecutor() |
| 228 | jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10) |
| 229 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 230 | listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}) |
| 231 | pilot = &riverpilot.StandardPilot{} |
| 232 | queueName = fmt.Sprintf("test-producer-with-notifier-%05d", randutil.IntBetween(1, 100_000)) |
| 233 | ) |
| 234 | |
| 235 | completer := jobcompleter.NewInlineCompleter(archetype, schema, exec, &riverpilot.StandardPilot{}, jobUpdates) |
| 236 | { |
| 237 | require.NoError(t, completer.Start(ctx)) |
| 238 | t.Cleanup(completer.Stop) |
| 239 | } |
| 240 | |
| 241 | notifier := notifier.New(archetype, listener) |
| 242 | { |
| 243 | require.NoError(t, notifier.Start(ctx)) |
| 244 | t.Cleanup(notifier.Stop) |
| 245 | } |
| 246 | |
| 247 | return newProducer(archetype, exec, pilot, &producerConfig{ |
| 248 | ClientID: testClientID, |
| 249 | Completer: completer, |
| 250 | ErrorHandler: newTestErrorHandler(), |
| 251 | FetchCooldown: FetchCooldownDefault, |
| 252 | FetchPollInterval: 50 * time.Millisecond, // more aggressive than normal so in case we miss the event, tests still pass quickly |
| 253 | HookLookupByJob: hooklookup.NewJobHookLookup(), |
| 254 | HookLookupGlobal: hooklookup.NewHookLookup(nil), |
| 255 | JobTimeout: JobTimeoutDefault, |
| 256 | MaxWorkers: 1_000, |
| 257 | MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), |
| 258 | Notifier: notifier, |
| 259 | Queue: queueName, |
| 260 | QueuePollInterval: queuePollIntervalDefault, |
| 261 | QueueReportInterval: queueReportIntervalDefault, |
| 262 | RetryPolicy: &DefaultClientRetryPolicy{}, |
| 263 | SchedulerInterval: riverinternaltest.SchedulerShortInterval, |
| 264 | Schema: schema, |
| 265 | StaleProducerRetentionPeriod: time.Minute, |
| 266 | Workers: NewWorkers(), |
| 267 | }), jobUpdates |
| 268 | }) |
| 269 | } |
| 270 | |
| 271 | func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testing.T) (*producer, chan []jobcompleter.CompleterJobUpdated)) { |
| 272 | t.Helper() |
nothing calls this directly
no test coverage detected
searching dependent graphs…