(t *testing.T)
| 66 | } |
| 67 | |
| 68 | func TestQueueMaintainer(t *testing.T) { |
| 69 | t.Parallel() |
| 70 | |
| 71 | ctx := context.Background() |
| 72 | |
| 73 | setup := func(t *testing.T, services []startstop.Service) *QueueMaintainer { |
| 74 | t.Helper() |
| 75 | |
| 76 | maintainer := NewQueueMaintainer(riversharedtest.BaseServiceArchetype(t), services) |
| 77 | maintainer.StaggerStartupDisable(true) |
| 78 | |
| 79 | return maintainer |
| 80 | } |
| 81 | |
| 82 | t.Run("StartStop", func(t *testing.T) { |
| 83 | t.Parallel() |
| 84 | |
| 85 | testSvc := newTestService(t) |
| 86 | maintainer := setup(t, []startstop.Service{testSvc}) |
| 87 | |
| 88 | require.NoError(t, maintainer.Start(ctx)) |
| 89 | testSvc.testSignals.started.WaitOrTimeout() |
| 90 | maintainer.Stop() |
| 91 | testSvc.testSignals.returning.WaitOrTimeout() |
| 92 | }) |
| 93 | |
| 94 | t.Run("StartStopStress", func(t *testing.T) { |
| 95 | t.Parallel() |
| 96 | |
| 97 | tx := riverdbtest.TestTxPgx(ctx, t) |
| 98 | sharedTx := sharedtx.NewSharedTx(tx) |
| 99 | |
| 100 | archetype := riversharedtest.BaseServiceArchetype(t) |
| 101 | archetype.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress |
| 102 | |
| 103 | driver := riverpgxv5.New(nil).UnwrapExecutor(sharedTx) |
| 104 | |
| 105 | periodicJobEnqueuer, err := NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{ |
| 106 | PeriodicJobs: []*PeriodicJob{ |
| 107 | { |
| 108 | ConstructorFunc: func() (*rivertype.JobInsertParams, error) { |
| 109 | return nil, ErrNoJobToInsert |
| 110 | }, |
| 111 | ScheduleFunc: cron.Every(15 * time.Minute).Next, |
| 112 | }, |
| 113 | }, |
| 114 | }, driver) |
| 115 | require.NoError(t, err) |
| 116 | |
| 117 | // Use realistic services in this one so we can verify stress not only |
| 118 | // on the queue maintainer, but it and all its subservices together. |
| 119 | maintainer := setup(t, []startstop.Service{ |
| 120 | NewJobCleaner(archetype, &JobCleanerConfig{}, driver), |
| 121 | periodicJobEnqueuer, |
| 122 | NewQueueCleaner(archetype, &QueueCleanerConfig{}, driver), |
| 123 | NewJobScheduler(archetype, &JobSchedulerConfig{}, driver), |
| 124 | }) |
| 125 | maintainer.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress |
nothing calls this directly
no test coverage detected
searching dependent graphs…