Run starts and runs the controller's event loop. It returns when ctx is cancelled or an unrecoverable error occurs. Before returning, it closes the controller, so it must only be called once. The event loop schedules/invokes resource reconciliation and periodically flushes catalog changes to persist
(ctx context.Context)
| 172 | // The implementation centers around these internal functions: enqueue, processQueue (uses markPending, trySchedule, invoke), and processCompletedInvocation. |
| 173 | // See their docstrings for further details. |
| 174 | func (c *Controller) Run(ctx context.Context) error { |
| 175 | // Initially enqueue all resources |
| 176 | c.mu.Lock() |
| 177 | for _, rs := range c.catalog.resources { |
| 178 | for _, r := range rs { |
| 179 | c.enqueue(r.Meta.Name) |
| 180 | } |
| 181 | } |
| 182 | c.mu.Unlock() |
| 183 | |
| 184 | // Ticker for periodically flushing catalog changes |
| 185 | flushTicker := time.NewTicker(10 * time.Second) |
| 186 | defer flushTicker.Stop() |
| 187 | |
| 188 | // Ticker for periodically checking for hanging reconciles that don't respond to cancelation |
| 189 | hangingTicker := time.NewTicker(time.Minute) |
| 190 | defer hangingTicker.Stop() |
| 191 | |
| 192 | // Timer for scheduling resources added to c.timeline. |
| 193 | // Call resetTimelineTimer whenever the timeline may have been changed (must hold mu). |
| 194 | timelineTimer := time.NewTimer(time.Second) |
| 195 | defer timelineTimer.Stop() |
| 196 | timelineTimer.Stop() // We want it stopped initially |
| 197 | nextTime := time.Time{} |
| 198 | resetTimelineTimer := func() { |
| 199 | _, t := c.timeline.Peek() |
| 200 | if t.Equal(nextTime) { |
| 201 | return |
| 202 | } |
| 203 | nextTime = t |
| 204 | |
| 205 | timelineTimer.Stop() |
| 206 | if t.IsZero() { |
| 207 | return |
| 208 | } |
| 209 | |
| 210 | d := time.Until(t) |
| 211 | if d <= 0 { |
| 212 | // must be positive |
| 213 | d = time.Nanosecond |
| 214 | } |
| 215 | |
| 216 | d += time.Second // Add a second to avoid rapid cancellations due to micro differences in schedule time |
| 217 | timelineTimer.Reset(d) |
| 218 | } |
| 219 | |
| 220 | // Run event loop |
| 221 | var stop bool |
| 222 | var loopErr error |
| 223 | for !stop { |
| 224 | select { |
| 225 | case <-c.queueUpdatedCh: // There are resources we should schedule |
| 226 | c.mu.Lock() |
| 227 | err := c.processQueue() |
| 228 | if err != nil { |
| 229 | loopErr = err |
| 230 | stop = true |
| 231 | } else { |