MCPcopy Index your code
hub / github.com/rilldata/rill / processQueue

Method processQueue

runtime/controller.go:1068–1096  ·  view source on GitHub ↗

processQueue calls attempts to schedule the resources in c.queue. It is invoked in each iteration of the event loop when there are resources in the queue. The reason we have the queue and process it from the event loop (instead of marking pending and scheduling directly from enqueue()) is to enable

()

Source from the content-addressed store, hash-verified

1066//
1067// It must be called while c.mu is held.
1068func (c *Controller) processQueue() error {
1069 // Mark-sweep like approach - first mark all impacted resources (including descendents) pending, then schedule the ones that have no pending parents.
1070
1071 // Phase 1: Mark items pending and trim queue when possible.
1072 for s, n := range c.queue {
1073 skip, err := c.markPending(n)
1074 if err != nil {
1075 return err
1076 }
1077 if skip {
1078 delete(c.queue, s)
1079 }
1080 }
1081
1082 // Phase 2: Ensure scheduling
1083 for s, n := range c.queue {
1084 ok, err := c.trySchedule(n)
1085 if err != nil {
1086 return err
1087 }
1088 if ok {
1089 delete(c.queue, s)
1090 }
1091 }
1092
1093 // Reset queueUpdated
1094 c.queueUpdated = false
1095 return nil
1096}
1097
1098// markPending marks a resource and its descendents as pending.
1099// It also clears errors on every resource marked pending - it would be confusing to show an old error after a change has been made that may fix it.

Callers 1

RunMethod · 0.95

Calls 2

markPendingMethod · 0.95
tryScheduleMethod · 0.95

Tested by

no test coverage detected