MCPcopy Index your code
hub / github.com/riverqueue/river / Test_Client_InsertTriggersImmediateWork

Function Test_Client_InsertTriggersImmediateWork

client_test.go:7361–7423  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

7359}
7360
7361func Test_Client_InsertTriggersImmediateWork(t *testing.T) {
7362 t.Parallel()
7363
7364 ctx := context.Background()
7365
7366 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
7367 t.Cleanup(cancel)
7368
7369 var (
7370 dbPool = riversharedtest.DBPool(ctx, t)
7371 driver = riverpgxv5.New(dbPool)
7372 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
7373 config = newTestConfig(t, schema)
7374 )
7375 config.FetchCooldown = 20 * time.Millisecond
7376 config.FetchPollInterval = 20 * time.Second // essentially disable polling
7377 config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}, "another_queue": {MaxWorkers: 1}}
7378
7379 type JobArgs struct {
7380 testutil.JobArgsReflectKind[JobArgs]
7381 }
7382
7383 doneCh := make(chan struct{})
7384 close(doneCh) // don't need to block any jobs from completing
7385 startedCh := make(chan int64)
7386 AddWorker(config.Workers, makeAwaitWorker[JobArgs](startedCh, doneCh))
7387
7388 client := newTestClient(t, dbPool, config)
7389
7390 startClient(ctx, t, client)
7391 riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
7392
7393 insertRes, err := client.Insert(ctx, JobArgs{}, nil)
7394 require.NoError(t, err)
7395
7396 // Wait for the client to be ready by waiting for a job to be executed:
7397 select {
7398 case jobID := <-startedCh:
7399 require.Equal(t, insertRes.Job.ID, jobID)
7400 case <-ctx.Done():
7401 t.Fatal("timed out waiting for warmup job to start")
7402 }
7403
7404 // Now that we've run one job, we shouldn't take longer than the cooldown to
7405 // fetch another after insertion. LISTEN/NOTIFY should ensure we find out
7406 // about the inserted job much faster than the poll interval.
7407 //
7408 // Note: we specifically use a different queue to ensure that the notify
7409 // limiter is immediately to fire on this queue.
7410 insertRes2, err := client.Insert(ctx, JobArgs{}, &InsertOpts{Queue: "another_queue"})
7411 require.NoError(t, err)
7412
7413 select {
7414 case jobID := <-startedCh:
7415 require.Equal(t, insertRes2.Job.ID, jobID)
7416 // As long as this is meaningfully shorter than the poll interval, we can be
7417 // sure the re-fetch came from listen/notify.
7418 case <-time.After(5 * time.Second):

Callers

nothing calls this directly

Calls 14

DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
WaitOrTimeoutFunction · 0.92
AddWorkerFunction · 0.85
newTestClientFunction · 0.85
CleanupMethod · 0.80
InsertMethod · 0.80
DoneMethod · 0.80
AfterMethod · 0.80
newTestConfigFunction · 0.70
startClientFunction · 0.70

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…