MCPcopy
hub / github.com/dgraph-io/dgraph / ProcessDelta

Method ProcessDelta

posting/oracle.go:309–351  ·  view source on GitHub ↗
(delta *pb.OracleDelta)

Source from the content-addressed store, hash-verified

307}
308
309func (o *oracle) ProcessDelta(delta *pb.OracleDelta) {
310 if glog.V(3) {
311 glog.Infof("ProcessDelta: Max Assigned: %d", delta.MaxAssigned)
312 glog.Infof("ProcessDelta: Group checksum: %v", delta.GroupChecksums)
313 for _, txn := range delta.Txns {
314 if txn.CommitTs == 0 {
315 glog.Infof("ProcessDelta Aborted: %d", txn.StartTs)
316 } else {
317 glog.Infof("ProcessDelta Committed: %d -> %d", txn.StartTs, txn.CommitTs)
318 }
319 }
320 }
321
322 o.Lock()
323 defer o.Unlock()
324 for _, status := range delta.Txns {
325 txn := o.pendingTxns[status.StartTs]
326 if txn != nil && status.CommitTs > 0 {
327 for k := range txn.cache.deltas {
328 IncrRollup.addKeyToBatch([]byte(k), 0)
329 }
330 }
331 delete(o.pendingTxns, status.StartTs)
332 }
333 curMax := o.MaxAssigned()
334 if delta.MaxAssigned < curMax {
335 return
336 }
337
338 // Notify the waiting cattle.
339 for startTs, toNotify := range o.waiters {
340 if startTs > delta.MaxAssigned {
341 continue
342 }
343 for _, ch := range toNotify {
344 close(ch)
345 }
346 delete(o.waiters, startTs)
347 }
348 x.AssertTrue(atomic.CompareAndSwapUint64(&o.maxAssigned, curMax, delta.MaxAssigned))
349 ostats.Record(context.Background(),
350 x.MaxAssignedTs.M(int64(delta.MaxAssigned))) // Can't access o.MaxAssigned without atomics.
351}
352
353func (o *oracle) ResetTxns() {
354 o.Lock()

Callers 4

TestExportRdfFunction · 0.80
TestExportJsonFunction · 0.80
commitTsFunction · 0.80
commitOrAbortMethod · 0.80

Calls 7

MaxAssignedMethod · 0.95
AssertTrueFunction · 0.92
InfofMethod · 0.80
addKeyToBatchMethod · 0.80
RecordMethod · 0.80
LockMethod · 0.45
UnlockMethod · 0.45

Tested by 3

TestExportRdfFunction · 0.64
TestExportJsonFunction · 0.64
commitTsFunction · 0.64