(t *testing.T)
| 2539 | } |
| 2540 | |
| 2541 | func Test_Client_SoftStopTimeout(t *testing.T) { |
| 2542 | t.Parallel() |
| 2543 | |
| 2544 | ctx := context.Background() |
| 2545 | |
| 2546 | type JobArgs struct { |
| 2547 | testutil.JobArgsReflectKind[JobArgs] |
| 2548 | } |
| 2549 | |
| 2550 | t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) { |
| 2551 | t.Parallel() |
| 2552 | |
| 2553 | config := newTestConfig(t, "") |
| 2554 | config.SoftStopTimeout = 100 * time.Millisecond |
| 2555 | |
| 2556 | jobDoneChan := make(chan struct{}) |
| 2557 | jobStartedChan := make(chan struct{}) |
| 2558 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 2559 | close(jobStartedChan) |
| 2560 | <-ctx.Done() // only finishes when context is cancelled |
| 2561 | close(jobDoneChan) |
| 2562 | return nil |
| 2563 | })) |
| 2564 | |
| 2565 | client := runNewTestClient(ctx, t, config) |
| 2566 | |
| 2567 | _, err := client.Insert(ctx, JobArgs{}, nil) |
| 2568 | require.NoError(t, err) |
| 2569 | |
| 2570 | riversharedtest.WaitOrTimeout(t, jobStartedChan) |
| 2571 | |
| 2572 | // Stop initiates a soft stop. The job won't finish on its own, but |
| 2573 | // SoftStopTimeout should escalate to a hard stop after 100ms. |
| 2574 | require.NoError(t, client.Stop(ctx)) |
| 2575 | |
| 2576 | // Verify the job's context was indeed cancelled. |
| 2577 | select { |
| 2578 | case <-jobDoneChan: |
| 2579 | default: |
| 2580 | t.Fatal("expected job to have been cancelled by soft stop timeout") |
| 2581 | } |
| 2582 | }) |
| 2583 | |
| 2584 | t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) { |
| 2585 | t.Parallel() |
| 2586 | |
| 2587 | config := newTestConfig(t, "") |
| 2588 | config.SoftStopTimeout = 5 * time.Second |
| 2589 | |
| 2590 | jobStartedChan := make(chan struct{}) |
| 2591 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 2592 | close(jobStartedChan) |
| 2593 | return nil // finishes immediately |
| 2594 | })) |
| 2595 | |
| 2596 | client := runNewTestClient(ctx, t, config) |
| 2597 | |
| 2598 | _, err := client.Insert(ctx, JobArgs{}, nil) |
nothing calls this directly
no test coverage detected
searching dependent graphs…