(t *testing.T)
| 25 | ) |
| 26 | |
| 27 | func Test_SubscriptionManager(t *testing.T) { |
| 28 | t.Parallel() |
| 29 | |
| 30 | ctx := context.Background() |
| 31 | |
| 32 | type testBundle struct { |
| 33 | exec riverdriver.Executor |
| 34 | subscribeCh chan []jobcompleter.CompleterJobUpdated |
| 35 | tx pgx.Tx |
| 36 | } |
| 37 | |
| 38 | setup := func(t *testing.T) (*subscriptionManager, *testBundle) { |
| 39 | t.Helper() |
| 40 | |
| 41 | tx := riverdbtest.TestTxPgx(ctx, t) |
| 42 | exec := riverpgxv5.New(nil).UnwrapExecutor(tx) |
| 43 | |
| 44 | subscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 1) |
| 45 | manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), subscribeCh) |
| 46 | |
| 47 | require.NoError(t, manager.Start(ctx)) |
| 48 | t.Cleanup(manager.Stop) |
| 49 | |
| 50 | return manager, &testBundle{ |
| 51 | exec: exec, |
| 52 | subscribeCh: subscribeCh, |
| 53 | tx: tx, |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | t.Run("DistributesRequestedEventsToSubscribers", func(t *testing.T) { |
| 58 | t.Parallel() |
| 59 | |
| 60 | manager, bundle := setup(t) |
| 61 | t.Cleanup(func() { close(bundle.subscribeCh) }) |
| 62 | |
| 63 | sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 10, Kinds: []EventKind{EventKindJobCompleted, EventKindJobSnoozed}}) |
| 64 | t.Cleanup(cancelSub) |
| 65 | |
| 66 | // Send some events |
| 67 | job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(time.Now())}) |
| 68 | job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(time.Now())}) |
| 69 | job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable)}) |
| 70 | job4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) |
| 71 | |
| 72 | makeStats := func(complete, wait, run time.Duration) *jobstats.JobStatistics { |
| 73 | return &jobstats.JobStatistics{ |
| 74 | CompleteDuration: complete, |
| 75 | QueueWaitDuration: wait, |
| 76 | RunDuration: run, |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{ |
| 81 | {Job: job1, JobStats: makeStats(101, 102, 103)}, // completed, should be sent |
| 82 | {Job: job2, JobStats: makeStats(201, 202, 203)}, // cancelled, should be skipped |
| 83 | } |
| 84 | bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{ |
nothing calls this directly
no test coverage detected
searching dependent graphs…