(t *testing.T)
| 7359 | } |
| 7360 | |
| 7361 | func Test_Client_InsertTriggersImmediateWork(t *testing.T) { |
| 7362 | t.Parallel() |
| 7363 | |
| 7364 | ctx := context.Background() |
| 7365 | |
| 7366 | ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 7367 | t.Cleanup(cancel) |
| 7368 | |
| 7369 | var ( |
| 7370 | dbPool = riversharedtest.DBPool(ctx, t) |
| 7371 | driver = riverpgxv5.New(dbPool) |
| 7372 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 7373 | config = newTestConfig(t, schema) |
| 7374 | ) |
| 7375 | config.FetchCooldown = 20 * time.Millisecond |
| 7376 | config.FetchPollInterval = 20 * time.Second // essentially disable polling |
| 7377 | config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}, "another_queue": {MaxWorkers: 1}} |
| 7378 | |
| 7379 | type JobArgs struct { |
| 7380 | testutil.JobArgsReflectKind[JobArgs] |
| 7381 | } |
| 7382 | |
| 7383 | doneCh := make(chan struct{}) |
| 7384 | close(doneCh) // don't need to block any jobs from completing |
| 7385 | startedCh := make(chan int64) |
| 7386 | AddWorker(config.Workers, makeAwaitWorker[JobArgs](startedCh, doneCh)) |
| 7387 | |
| 7388 | client := newTestClient(t, dbPool, config) |
| 7389 | |
| 7390 | startClient(ctx, t, client) |
| 7391 | riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) |
| 7392 | |
| 7393 | insertRes, err := client.Insert(ctx, JobArgs{}, nil) |
| 7394 | require.NoError(t, err) |
| 7395 | |
| 7396 | // Wait for the client to be ready by waiting for a job to be executed: |
| 7397 | select { |
| 7398 | case jobID := <-startedCh: |
| 7399 | require.Equal(t, insertRes.Job.ID, jobID) |
| 7400 | case <-ctx.Done(): |
| 7401 | t.Fatal("timed out waiting for warmup job to start") |
| 7402 | } |
| 7403 | |
| 7404 | // Now that we've run one job, we shouldn't take longer than the cooldown to |
| 7405 | // fetch another after insertion. LISTEN/NOTIFY should ensure we find out |
| 7406 | // about the inserted job much faster than the poll interval. |
| 7407 | // |
| 7408 | // Note: we specifically use a different queue to ensure that the notify |
| 7409 | // limiter is immediately to fire on this queue. |
| 7410 | insertRes2, err := client.Insert(ctx, JobArgs{}, &InsertOpts{Queue: "another_queue"}) |
| 7411 | require.NoError(t, err) |
| 7412 | |
| 7413 | select { |
| 7414 | case jobID := <-startedCh: |
| 7415 | require.Equal(t, insertRes2.Job.ID, jobID) |
| 7416 | // As long as this is meaningfully shorter than the poll interval, we can be |
| 7417 | // sure the re-fetch came from listen/notify. |
| 7418 | case <-time.After(5 * time.Second): |
nothing calls this directly
no test coverage detected
searching dependent graphs…