MCPcopy Index your code
hub / github.com/riverqueue/river / Test_SubscriptionManager

Function Test_SubscriptionManager

subscription_manager_test.go:27–199  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

25)
26
27func Test_SubscriptionManager(t *testing.T) {
28 t.Parallel()
29
30 ctx := context.Background()
31
32 type testBundle struct {
33 exec riverdriver.Executor
34 subscribeCh chan []jobcompleter.CompleterJobUpdated
35 tx pgx.Tx
36 }
37
38 setup := func(t *testing.T) (*subscriptionManager, *testBundle) {
39 t.Helper()
40
41 tx := riverdbtest.TestTxPgx(ctx, t)
42 exec := riverpgxv5.New(nil).UnwrapExecutor(tx)
43
44 subscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 1)
45 manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), subscribeCh)
46
47 require.NoError(t, manager.Start(ctx))
48 t.Cleanup(manager.Stop)
49
50 return manager, &testBundle{
51 exec: exec,
52 subscribeCh: subscribeCh,
53 tx: tx,
54 }
55 }
56
57 t.Run("DistributesRequestedEventsToSubscribers", func(t *testing.T) {
58 t.Parallel()
59
60 manager, bundle := setup(t)
61 t.Cleanup(func() { close(bundle.subscribeCh) })
62
63 sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 10, Kinds: []EventKind{EventKindJobCompleted, EventKindJobSnoozed}})
64 t.Cleanup(cancelSub)
65
66 // Send some events
67 job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(time.Now())})
68 job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(time.Now())})
69 job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable)})
70 job4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})
71
72 makeStats := func(complete, wait, run time.Duration) *jobstats.JobStatistics {
73 return &jobstats.JobStatistics{
74 CompleteDuration: complete,
75 QueueWaitDuration: wait,
76 RunDuration: run,
77 }
78 }
79
80 bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{
81 {Job: job1, JobStats: makeStats(101, 102, 103)}, // completed, should be sent
82 {Job: job2, JobStats: makeStats(201, 202, 203)}, // cancelled, should be skipped
83 }
84 bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{

Callers

nothing calls this directly

Calls 15

TestTxPgxFunction · 0.92
NewFunction · 0.92
BaseServiceArchetypeFunction · 0.92
JobFunction · 0.92
PtrFunction · 0.92
WaitOrTimeoutNFunction · 0.92
StressFunction · 0.92
newSubscriptionManagerFunction · 0.85
CleanupMethod · 0.80
HelperMethod · 0.65
UnwrapExecutorMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…