(t *testing.T)
| 2440 | } |
| 2441 | |
| 2442 | func Test_Client_StopAndCancel(t *testing.T) { |
| 2443 | t.Parallel() |
| 2444 | |
| 2445 | ctx := context.Background() |
| 2446 | |
| 2447 | type JobArgs struct { |
| 2448 | testutil.JobArgsReflectKind[JobArgs] |
| 2449 | } |
| 2450 | |
| 2451 | type testBundle struct { |
| 2452 | jobDoneChan chan struct{} |
| 2453 | jobStartedChan chan int64 |
| 2454 | } |
| 2455 | |
| 2456 | setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { |
| 2457 | t.Helper() |
| 2458 | |
| 2459 | config := newTestConfig(t, "") |
| 2460 | |
| 2461 | var ( |
| 2462 | jobStartedChan = make(chan int64) |
| 2463 | jobDoneChan = make(chan struct{}) |
| 2464 | ) |
| 2465 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 2466 | jobStartedChan <- job.ID |
| 2467 | t.Logf("Job waiting for context cancellation") |
| 2468 | defer t.Logf("Job finished") |
| 2469 | <-ctx.Done() |
| 2470 | t.Logf("Job context done, closing chan and returning") |
| 2471 | close(jobDoneChan) |
| 2472 | return nil |
| 2473 | })) |
| 2474 | |
| 2475 | client := runNewTestClient(ctx, t, config) |
| 2476 | |
| 2477 | return client, &testBundle{ |
| 2478 | jobDoneChan: jobDoneChan, |
| 2479 | jobStartedChan: jobStartedChan, |
| 2480 | } |
| 2481 | } |
| 2482 | |
| 2483 | t.Run("OnItsOwn", func(t *testing.T) { |
| 2484 | t.Parallel() |
| 2485 | |
| 2486 | client, bundle := setup(t) |
| 2487 | |
| 2488 | startClient(ctx, t, client) |
| 2489 | |
| 2490 | _, err := client.Insert(ctx, JobArgs{}, nil) |
| 2491 | require.NoError(t, err) |
| 2492 | |
| 2493 | riversharedtest.WaitOrTimeout(t, bundle.jobStartedChan) |
| 2494 | |
| 2495 | require.NoError(t, client.StopAndCancel(ctx)) |
| 2496 | riversharedtest.WaitOrTimeout(t, client.Stopped()) |
| 2497 | }) |
| 2498 | |
| 2499 | t.Run("BeforeStart", func(t *testing.T) { |
nothing calls this directly
no test coverage detected
searching dependent graphs…