MCPcopy Index your code
hub / github.com/riverqueue/river / Start

Method Start

internal/maintenance/queue_cleaner.go:102–142  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

100}
101
102func (s *QueueCleaner) Start(ctx context.Context) error {
103 ctx, shouldStart, started, stopped := s.StartInit(ctx)
104 if !shouldStart {
105 return nil
106 }
107
108 s.StaggerStart(ctx)
109
110 go func() {
111 started()
112 defer stopped() // this defer should come first so it's last out
113
114 s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
115 defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
116
117 ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
118 for {
119 select {
120 case <-ctx.Done():
121 return
122 case <-ticker.C:
123 }
124
125 res, err := s.runOnce(ctx)
126 if err != nil {
127 if !errors.Is(err, context.Canceled) {
128 s.Logger.ErrorContext(ctx, s.Name+": Error cleaning queues", slog.String("error", err.Error()))
129 }
130 continue
131 }
132
133 if len(res.QueuesDeleted) > 0 {
134 s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
135 slog.String("queues_deleted", strings.Join(res.QueuesDeleted, ",")),
136 )
137 }
138 }
139 }()
140
141 return nil
142}
143
144func (s *QueueCleaner) batchSize() int {
145 if s.reducedBatchSizeBreaker.Open() {

Callers 1

TestQueueCleanerFunction · 0.95

Calls 7

runOnceMethod · 0.95
NewTickerWithInitialTickFunction · 0.92
StartInitMethod · 0.80
StaggerStartMethod · 0.80
DoneMethod · 0.80
IsMethod · 0.45
ErrorMethod · 0.45

Tested by 1

TestQueueCleanerFunction · 0.76