| 307 | } |
| 308 | |
| 309 | func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { |
| 310 | if glog.V(3) { |
| 311 | glog.Infof("ProcessDelta: Max Assigned: %d", delta.MaxAssigned) |
| 312 | glog.Infof("ProcessDelta: Group checksum: %v", delta.GroupChecksums) |
| 313 | for _, txn := range delta.Txns { |
| 314 | if txn.CommitTs == 0 { |
| 315 | glog.Infof("ProcessDelta Aborted: %d", txn.StartTs) |
| 316 | } else { |
| 317 | glog.Infof("ProcessDelta Committed: %d -> %d", txn.StartTs, txn.CommitTs) |
| 318 | } |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | o.Lock() |
| 323 | defer o.Unlock() |
| 324 | for _, status := range delta.Txns { |
| 325 | txn := o.pendingTxns[status.StartTs] |
| 326 | if txn != nil && status.CommitTs > 0 { |
| 327 | for k := range txn.cache.deltas { |
| 328 | IncrRollup.addKeyToBatch([]byte(k), 0) |
| 329 | } |
| 330 | } |
| 331 | delete(o.pendingTxns, status.StartTs) |
| 332 | } |
| 333 | curMax := o.MaxAssigned() |
| 334 | if delta.MaxAssigned < curMax { |
| 335 | return |
| 336 | } |
| 337 | |
| 338 | // Notify the waiting cattle. |
| 339 | for startTs, toNotify := range o.waiters { |
| 340 | if startTs > delta.MaxAssigned { |
| 341 | continue |
| 342 | } |
| 343 | for _, ch := range toNotify { |
| 344 | close(ch) |
| 345 | } |
| 346 | delete(o.waiters, startTs) |
| 347 | } |
| 348 | x.AssertTrue(atomic.CompareAndSwapUint64(&o.maxAssigned, curMax, delta.MaxAssigned)) |
| 349 | ostats.Record(context.Background(), |
| 350 | x.MaxAssignedTs.M(int64(delta.MaxAssigned))) // Can't access o.MaxAssigned without atomics. |
| 351 | } |
| 352 | |
| 353 | func (o *oracle) ResetTxns() { |
| 354 | o.Lock() |