MCPcopy
hub / github.com/ha/doozerd / process

Method process

src/pkg/store/store.go:218–289  ·  view source on GitHub ↗
(ops <-chan Op, seqns chan<- int64, watches chan<- int)

Source from the content-addressed store, hash-verified

216}
217
218func (st *Store) process(ops <-chan Op, seqns chan<- int64, watches chan<- int) {
219 defer st.closeWatches()
220
221 for {
222 var flush bool
223 ver, values := st.state.ver, st.state.root
224
225 // Take any incoming requests and queue them up.
226 select {
227 case a, ok := <-ops:
228 if !ok {
229 return
230 }
231
232 if a.Seqn > ver {
233 heap.Push(st.todo, a)
234 }
235 case w := <-st.watchCh:
236 n, ws := w.rev, []*watch{w}
237 for ; len(ws) > 0 && n < st.head; n++ {
238 ws = []*watch{}
239 }
240 for ; len(ws) > 0 && n <= ver; n++ {
241 ws = st.notify(st.log[n], ws)
242 }
243
244 st.watches = append(st.watches, ws...)
245 case seqn := <-st.cleanCh:
246 for ; st.head <= seqn; st.head++ {
247 st.log[st.head] = Event{}, false
248 }
249 case seqns <- ver:
250 // nothing to do here
251 case watches <- len(st.watches):
252 // nothing to do here
253 case flush = <-st.flush:
254 // nothing
255 }
256
257 var ev Event
258 // If we have any mutations that can be applied, do them.
259 for st.todo.Len() > 0 {
260 t := st.todo.At(0).(Op)
261 if flush && ver < t.Seqn {
262 ver = t.Seqn - 1
263 }
264 if t.Seqn > ver+1 {
265 break
266 }
267
268 heap.Pop(st.todo)
269 if t.Seqn < ver+1 {
270 continue
271 }
272
273 values, ev = values.apply(t.Seqn, t.Mut)
274 st.state = &state{ev.Seqn, values}
275 ver = ev.Seqn

Callers 1

NewFunction · 0.95

Calls 3

closeWatchesMethod · 0.95
notifyMethod · 0.95
applyMethod · 0.80

Tested by

no test coverage detected