TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
(pkey uint64, delta *pb.OracleDelta)
| 964 | |
| 965 | // TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused. |
| 966 | func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { |
| 967 | x.PrintOracleDelta(delta) |
| 968 | // First let's commit all mutations to disk. |
| 969 | writer := posting.NewTxnWriter(pstore) |
| 970 | toDisk := func(start, commit uint64) { |
| 971 | txn := posting.Oracle().GetTxn(start) |
| 972 | if txn == nil || commit == 0 { |
| 973 | return |
| 974 | } |
| 975 | // If the transaction has failed, we dont need to update it. |
| 976 | if commit != 0 { |
| 977 | txn.Update() |
| 978 | } |
| 979 | // We start with 20 ms, so that we end up waiting 5 mins by the end. |
| 980 | // If there is any transient issue, it should get fixed within that timeframe. |
| 981 | err := x.ExponentialRetry(int(x.Config.MaxRetries), |
| 982 | 20*time.Millisecond, func() error { |
| 983 | err := txn.CommitToDisk(writer, commit) |
| 984 | if err == badger.ErrBannedKey { |
| 985 | glog.Errorf("Error while writing to banned namespace.") |
| 986 | return nil |
| 987 | } |
| 988 | return err |
| 989 | }) |
| 990 | |
| 991 | if err != nil { |
| 992 | glog.Errorf("Error while applying txn status to disk (%d -> %d): %v", |
| 993 | start, commit, err) |
| 994 | panic(err) |
| 995 | } |
| 996 | } |
| 997 | |
| 998 | for _, status := range delta.Txns { |
| 999 | toDisk(status.StartTs, status.CommitTs) |
| 1000 | } |
| 1001 | if err := writer.Flush(); err != nil { |
| 1002 | return errors.Wrapf(err, "while flushing to disk") |
| 1003 | } |
| 1004 | |
| 1005 | if x.WorkerConfig.HardSync { |
| 1006 | if err := pstore.Sync(); err != nil { |
| 1007 | glog.Errorf("Error while calling Sync while commitOrAbort: %v", err) |
| 1008 | } |
| 1009 | } |
| 1010 | |
| 1011 | g := groups() |
| 1012 | if delta.GroupChecksums != nil && delta.GroupChecksums[g.groupId()] > 0 { |
| 1013 | atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()]) |
| 1014 | } |
| 1015 | |
| 1016 | // Clear all the cached lists that were touched by this transaction. |
| 1017 | for _, status := range delta.Txns { |
| 1018 | txn := posting.Oracle().GetTxn(status.StartTs) |
| 1019 | txn.UpdateCachedKeys(status.CommitTs) |
| 1020 | } |
| 1021 | |
| 1022 | // Now advance Oracle(), so we can service waiting reads. |
| 1023 | posting.Oracle().ProcessDelta(delta) |
no test coverage detected