MCPcopy
hub / github.com/dgraph-io/dgraph / sendDeltasToSubscribers

Method sendDeltasToSubscribers

dgraph/cmd/zero/oracle.go:185–248  ·  view source on GitHub ↗

sendDeltasToSubscribers reads updates from the o.updates constructs a delta object containing transactions from one or more updates and sends the delta object to each subscriber's channel

()

Source from the content-addressed store, hash-verified

183// constructs a delta object containing transactions from one or more updates
184// and sends the delta object to each subscriber's channel
185func (o *Oracle) sendDeltasToSubscribers() {
186 delta := &pb.OracleDelta{}
187 ticker := time.Tick(time.Second)
188
189 // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns
190 waitFor := func() uint64 {
191 w := delta.MaxAssigned
192 for _, txn := range delta.Txns {
193 w = x.Max(w, txn.CommitTs)
194 }
195 return w
196 }
197
198 for {
199 get_update:
200 var update *pb.OracleDelta
201 select {
202 case update = <-o.updates:
203 case <-ticker:
204 wait := waitFor()
205 if wait == 0 || o.doneUntil.DoneUntil() < wait {
206 goto get_update
207 }
208 // Send empty update.
209 update = &pb.OracleDelta{}
210 }
211 slurp_loop:
212 for {
213 delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
214 delta.Txns = append(delta.Txns, update.Txns...)
215 select {
216 case update = <-o.updates:
217 default:
218 break slurp_loop
219 }
220 }
221 // No need to sort the txn updates here. Alpha would sort them before
222 // applying.
223
224 // Let's ensure that we have all the commits up until the max here.
225 // Otherwise, we'll be sending commit timestamps out of order, which
226 // would cause Alphas to drop some of them, during writes to Badger.
227 if o.doneUntil.DoneUntil() < waitFor() {
228 continue // The for loop doing blocking reads from o.updates.
229 // We need at least one entry from the updates channel to pick up a missing update.
230 // Don't goto slurp_loop, because it would break from select immediately.
231 }
232
233 if glog.V(3) {
234 glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
235 }
236 o.Lock()
237 for id, ch := range o.subscribers {
238 select {
239 case ch <- *delta:
240 default:
241 close(ch)
242 delete(o.subscribers, id)

Callers 1

InitMethod · 0.95

Calls 4

MaxFunction · 0.92
InfofMethod · 0.80
LockMethod · 0.45
UnlockMethod · 0.45

Tested by

no test coverage detected