MCPcopy Index your code
hub / github.com/riverqueue/river / runOnce

Method runOnce

internal/maintenance/queue_cleaner.go:155–197  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

153}
154
155func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, error) {
156 res := &queueCleanerRunOnceResult{QueuesDeleted: make([]string, 0, 10)}
157
158 for {
159 // Wrapped in a function so that defers run as expected.
160 queuesDeleted, err := func() ([]string, error) {
161 ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault)
162 defer cancelFunc()
163
164 queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{
165 Max: s.batchSize(),
166 Schema: s.Config.Schema,
167 UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod),
168 })
169 if err != nil {
170 return nil, fmt.Errorf("error deleting expired queues: %w", err)
171 }
172
173 s.reducedBatchSizeBreaker.ResetIfNotOpen()
174
175 return queuesDeleted, nil
176 }()
177 if err != nil {
178 if errors.Is(err, context.DeadlineExceeded) {
179 s.reducedBatchSizeBreaker.Trip()
180 }
181
182 return nil, err
183 }
184
185 s.TestSignals.DeletedBatch.Signal(struct{}{})
186
187 res.QueuesDeleted = append(res.QueuesDeleted, queuesDeleted...)
188 // Deleted was less than query `LIMIT` which means work is done.
189 if len(queuesDeleted) < s.batchSize() {
190 break
191 }
192
193 serviceutil.CancellableSleep(ctx, randutil.DurationBetween(riversharedmaintenance.BatchBackoffMin, riversharedmaintenance.BatchBackoffMax))
194 }
195
196 return res, nil
197}

Callers 2

StartMethod · 0.95
TestQueueCleanerFunction · 0.95

Calls 11

batchSizeMethod · 0.95
CancellableSleepFunction · 0.92
DurationBetweenFunction · 0.92
ResetIfNotOpenMethod · 0.80
TripMethod · 0.80
SignalMethod · 0.80
QueueDeleteExpiredMethod · 0.65
NowMethod · 0.65
ErrorfMethod · 0.65
AddMethod · 0.45
IsMethod · 0.45

Tested by 1

TestQueueCleanerFunction · 0.76