MCPcopy Index your code
hub / github.com/rilldata/rill / Run

Method Run

runtime/controller.go:174–392  ·  view source on GitHub ↗

Run starts and runs the controller's event loop. It returns when ctx is cancelled or an unrecoverable error occurs. Before returning, it closes the controller, so it must only be called once. The event loop schedules/invokes resource reconciliation and periodically flushes catalog changes to persist

(ctx context.Context)

Source from the content-addressed store, hash-verified

172// The implementation centers around these internal functions: enqueue, processQueue (uses markPending, trySchedule, invoke), and processCompletedInvocation.
173// See their docstrings for further details.
174func (c *Controller) Run(ctx context.Context) error {
175 // Initially enqueue all resources
176 c.mu.Lock()
177 for _, rs := range c.catalog.resources {
178 for _, r := range rs {
179 c.enqueue(r.Meta.Name)
180 }
181 }
182 c.mu.Unlock()
183
184 // Ticker for periodically flushing catalog changes
185 flushTicker := time.NewTicker(10 * time.Second)
186 defer flushTicker.Stop()
187
188 // Ticker for periodically checking for hanging reconciles that don't respond to cancelation
189 hangingTicker := time.NewTicker(time.Minute)
190 defer hangingTicker.Stop()
191
192 // Timer for scheduling resources added to c.timeline.
193 // Call resetTimelineTimer whenever the timeline may have been changed (must hold mu).
194 timelineTimer := time.NewTimer(time.Second)
195 defer timelineTimer.Stop()
196 timelineTimer.Stop() // We want it stopped initially
197 nextTime := time.Time{}
198 resetTimelineTimer := func() {
199 _, t := c.timeline.Peek()
200 if t.Equal(nextTime) {
201 return
202 }
203 nextTime = t
204
205 timelineTimer.Stop()
206 if t.IsZero() {
207 return
208 }
209
210 d := time.Until(t)
211 if d <= 0 {
212 // must be positive
213 d = time.Nanosecond
214 }
215
216 d += time.Second // Add a second to avoid rapid cancellations due to micro differences in schedule time
217 timelineTimer.Reset(d)
218 }
219
220 // Run event loop
221 var stop bool
222 var loopErr error
223 for !stop {
224 select {
225 case <-c.queueUpdatedCh: // There are resources we should schedule
226 c.mu.Lock()
227 err := c.processQueue()
228 if err != nil {
229 loopErr = err
230 stop = true
231 } else {

Calls 15

enqueueMethod · 0.95
processQueueMethod · 0.95
checkIdleWaitsMethod · 0.95
WithMinimumDurationFunction · 0.92
RLockMethod · 0.80
RUnlockMethod · 0.80
resetEventsMethod · 0.80
DoneMethod · 0.80
ErrMethod · 0.65
ErrorfMethod · 0.65
StringMethod · 0.65