MCPcopy
hub / github.com/riverqueue/river / TestQueueMaintainer

Function TestQueueMaintainer

internal/maintenance/queue_maintainer_test.go:68–198  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

66}
67
68func TestQueueMaintainer(t *testing.T) {
69 t.Parallel()
70
71 ctx := context.Background()
72
73 setup := func(t *testing.T, services []startstop.Service) *QueueMaintainer {
74 t.Helper()
75
76 maintainer := NewQueueMaintainer(riversharedtest.BaseServiceArchetype(t), services)
77 maintainer.StaggerStartupDisable(true)
78
79 return maintainer
80 }
81
82 t.Run("StartStop", func(t *testing.T) {
83 t.Parallel()
84
85 testSvc := newTestService(t)
86 maintainer := setup(t, []startstop.Service{testSvc})
87
88 require.NoError(t, maintainer.Start(ctx))
89 testSvc.testSignals.started.WaitOrTimeout()
90 maintainer.Stop()
91 testSvc.testSignals.returning.WaitOrTimeout()
92 })
93
94 t.Run("StartStopStress", func(t *testing.T) {
95 t.Parallel()
96
97 tx := riverdbtest.TestTxPgx(ctx, t)
98 sharedTx := sharedtx.NewSharedTx(tx)
99
100 archetype := riversharedtest.BaseServiceArchetype(t)
101 archetype.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress
102
103 driver := riverpgxv5.New(nil).UnwrapExecutor(sharedTx)
104
105 periodicJobEnqueuer, err := NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{
106 PeriodicJobs: []*PeriodicJob{
107 {
108 ConstructorFunc: func() (*rivertype.JobInsertParams, error) {
109 return nil, ErrNoJobToInsert
110 },
111 ScheduleFunc: cron.Every(15 * time.Minute).Next,
112 },
113 },
114 }, driver)
115 require.NoError(t, err)
116
117 // Use realistic services in this one so we can verify stress not only
118 // on the queue maintainer, but it and all its subservices together.
119 maintainer := setup(t, []startstop.Service{
120 NewJobCleaner(archetype, &JobCleanerConfig{}, driver),
121 periodicJobEnqueuer,
122 NewQueueCleaner(archetype, &QueueCleanerConfig{}, driver),
123 NewJobScheduler(archetype, &JobSchedulerConfig{}, driver),
124 })
125 maintainer.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress

Callers

nothing calls this directly

Calls 15

StaggerStartupDisableMethod · 0.95
StartMethod · 0.95
BaseServiceArchetypeFunction · 0.92
TestTxPgxFunction · 0.92
NewSharedTxFunction · 0.92
LoggerWarnFunction · 0.92
NewFunction · 0.92
StressFunction · 0.92
WaitOrTimeoutFunction · 0.92
NewQueueMaintainerFunction · 0.85
newTestServiceFunction · 0.85
NewPeriodicJobEnqueuerFunction · 0.85

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…