MCPcopy
hub / github.com/dgraph-io/dgraph / commitOrAbort

Method commitOrAbort

worker/draft.go:966–1025  ·  view source on GitHub ↗

TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.

(pkey uint64, delta *pb.OracleDelta)

Source from the content-addressed store, hash-verified

964
965// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
966func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
967 x.PrintOracleDelta(delta)
968 // First let's commit all mutations to disk.
969 writer := posting.NewTxnWriter(pstore)
970 toDisk := func(start, commit uint64) {
971 txn := posting.Oracle().GetTxn(start)
972 if txn == nil || commit == 0 {
973 return
974 }
975 // If the transaction has failed, we dont need to update it.
976 if commit != 0 {
977 txn.Update()
978 }
979 // We start with 20 ms, so that we end up waiting 5 mins by the end.
980 // If there is any transient issue, it should get fixed within that timeframe.
981 err := x.ExponentialRetry(int(x.Config.MaxRetries),
982 20*time.Millisecond, func() error {
983 err := txn.CommitToDisk(writer, commit)
984 if err == badger.ErrBannedKey {
985 glog.Errorf("Error while writing to banned namespace.")
986 return nil
987 }
988 return err
989 })
990
991 if err != nil {
992 glog.Errorf("Error while applying txn status to disk (%d -> %d): %v",
993 start, commit, err)
994 panic(err)
995 }
996 }
997
998 for _, status := range delta.Txns {
999 toDisk(status.StartTs, status.CommitTs)
1000 }
1001 if err := writer.Flush(); err != nil {
1002 return errors.Wrapf(err, "while flushing to disk")
1003 }
1004
1005 if x.WorkerConfig.HardSync {
1006 if err := pstore.Sync(); err != nil {
1007 glog.Errorf("Error while calling Sync while commitOrAbort: %v", err)
1008 }
1009 }
1010
1011 g := groups()
1012 if delta.GroupChecksums != nil && delta.GroupChecksums[g.groupId()] > 0 {
1013 atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()])
1014 }
1015
1016 // Clear all the cached lists that were touched by this transaction.
1017 for _, status := range delta.Txns {
1018 txn := posting.Oracle().GetTxn(status.StartTs)
1019 txn.UpdateCachedKeys(status.CommitTs)
1020 }
1021
1022 // Now advance Oracle(), so we can service waiting reads.
1023 posting.Oracle().ProcessDelta(delta)

Callers 2

applyCommittedMethod · 0.95
ApplyCommitedFunction · 0.80

Calls 14

FlushMethod · 0.95
PrintOracleDeltaFunction · 0.92
NewTxnWriterFunction · 0.92
OracleFunction · 0.92
ExponentialRetryFunction · 0.92
groupsFunction · 0.85
CommitToDiskMethod · 0.80
groupIdMethod · 0.80
UpdateCachedKeysMethod · 0.80
ProcessDeltaMethod · 0.80
SyncMethod · 0.65
GetTxnMethod · 0.45

Tested by

no test coverage detected