(ops <-chan Op, seqns chan<- int64, watches chan<- int)
| 216 | } |
| 217 | |
| 218 | func (st *Store) process(ops <-chan Op, seqns chan<- int64, watches chan<- int) { |
| 219 | defer st.closeWatches() |
| 220 | |
| 221 | for { |
| 222 | var flush bool |
| 223 | ver, values := st.state.ver, st.state.root |
| 224 | |
| 225 | // Take any incoming requests and queue them up. |
| 226 | select { |
| 227 | case a, ok := <-ops: |
| 228 | if !ok { |
| 229 | return |
| 230 | } |
| 231 | |
| 232 | if a.Seqn > ver { |
| 233 | heap.Push(st.todo, a) |
| 234 | } |
| 235 | case w := <-st.watchCh: |
| 236 | n, ws := w.rev, []*watch{w} |
| 237 | for ; len(ws) > 0 && n < st.head; n++ { |
| 238 | ws = []*watch{} |
| 239 | } |
| 240 | for ; len(ws) > 0 && n <= ver; n++ { |
| 241 | ws = st.notify(st.log[n], ws) |
| 242 | } |
| 243 | |
| 244 | st.watches = append(st.watches, ws...) |
| 245 | case seqn := <-st.cleanCh: |
| 246 | for ; st.head <= seqn; st.head++ { |
| 247 | st.log[st.head] = Event{}, false |
| 248 | } |
| 249 | case seqns <- ver: |
| 250 | // nothing to do here |
| 251 | case watches <- len(st.watches): |
| 252 | // nothing to do here |
| 253 | case flush = <-st.flush: |
| 254 | // nothing |
| 255 | } |
| 256 | |
| 257 | var ev Event |
| 258 | // If we have any mutations that can be applied, do them. |
| 259 | for st.todo.Len() > 0 { |
| 260 | t := st.todo.At(0).(Op) |
| 261 | if flush && ver < t.Seqn { |
| 262 | ver = t.Seqn - 1 |
| 263 | } |
| 264 | if t.Seqn > ver+1 { |
| 265 | break |
| 266 | } |
| 267 | |
| 268 | heap.Pop(st.todo) |
| 269 | if t.Seqn < ver+1 { |
| 270 | continue |
| 271 | } |
| 272 | |
| 273 | values, ev = values.apply(t.Seqn, t.Mut) |
| 274 | st.state = &state{ev.Seqn, values} |
| 275 | ver = ev.Seqn |
no test coverage detected