| 153 | } |
| 154 | |
| 155 | func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, error) { |
| 156 | res := &queueCleanerRunOnceResult{QueuesDeleted: make([]string, 0, 10)} |
| 157 | |
| 158 | for { |
| 159 | // Wrapped in a function so that defers run as expected. |
| 160 | queuesDeleted, err := func() ([]string, error) { |
| 161 | ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault) |
| 162 | defer cancelFunc() |
| 163 | |
| 164 | queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ |
| 165 | Max: s.batchSize(), |
| 166 | Schema: s.Config.Schema, |
| 167 | UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod), |
| 168 | }) |
| 169 | if err != nil { |
| 170 | return nil, fmt.Errorf("error deleting expired queues: %w", err) |
| 171 | } |
| 172 | |
| 173 | s.reducedBatchSizeBreaker.ResetIfNotOpen() |
| 174 | |
| 175 | return queuesDeleted, nil |
| 176 | }() |
| 177 | if err != nil { |
| 178 | if errors.Is(err, context.DeadlineExceeded) { |
| 179 | s.reducedBatchSizeBreaker.Trip() |
| 180 | } |
| 181 | |
| 182 | return nil, err |
| 183 | } |
| 184 | |
| 185 | s.TestSignals.DeletedBatch.Signal(struct{}{}) |
| 186 | |
| 187 | res.QueuesDeleted = append(res.QueuesDeleted, queuesDeleted...) |
| 188 | // Deleted was less than query `LIMIT` which means work is done. |
| 189 | if len(queuesDeleted) < s.batchSize() { |
| 190 | break |
| 191 | } |
| 192 | |
| 193 | serviceutil.CancellableSleep(ctx, randutil.DurationBetween(riversharedmaintenance.BatchBackoffMin, riversharedmaintenance.BatchBackoffMax)) |
| 194 | } |
| 195 | |
| 196 | return res, nil |
| 197 | } |