| 100 | } |
| 101 | |
| 102 | func (s *QueueCleaner) Start(ctx context.Context) error { |
| 103 | ctx, shouldStart, started, stopped := s.StartInit(ctx) |
| 104 | if !shouldStart { |
| 105 | return nil |
| 106 | } |
| 107 | |
| 108 | s.StaggerStart(ctx) |
| 109 | |
| 110 | go func() { |
| 111 | started() |
| 112 | defer stopped() // this defer should come first so it's last out |
| 113 | |
| 114 | s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) |
| 115 | defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) |
| 116 | |
| 117 | ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) |
| 118 | for { |
| 119 | select { |
| 120 | case <-ctx.Done(): |
| 121 | return |
| 122 | case <-ticker.C: |
| 123 | } |
| 124 | |
| 125 | res, err := s.runOnce(ctx) |
| 126 | if err != nil { |
| 127 | if !errors.Is(err, context.Canceled) { |
| 128 | s.Logger.ErrorContext(ctx, s.Name+": Error cleaning queues", slog.String("error", err.Error())) |
| 129 | } |
| 130 | continue |
| 131 | } |
| 132 | |
| 133 | if len(res.QueuesDeleted) > 0 { |
| 134 | s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully, |
| 135 | slog.String("queues_deleted", strings.Join(res.QueuesDeleted, ",")), |
| 136 | ) |
| 137 | } |
| 138 | } |
| 139 | }() |
| 140 | |
| 141 | return nil |
| 142 | } |
| 143 | |
| 144 | func (s *QueueCleaner) batchSize() int { |
| 145 | if s.reducedBatchSizeBreaker.Open() { |