MCPcopy
hub / github.com/dgraph-io/dgraph / processApplyCh

Method processApplyCh

worker/draft.go:863–963  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

861}
862
863func (n *node) processApplyCh() {
864 defer n.closer.Done() // CLOSER:1
865
866 type P struct {
867 err error
868 size int
869 seen time.Time
870 }
871 previous := make(map[uint64]*P)
872
873 // This function must be run serially.
874 handle := func(entries []raftpb.Entry) {
875 glog.V(3).Infof("handling element in applyCh with #entries %v", len(entries))
876 defer glog.V(3).Infof("done handling element in applyCh")
877
878 var totalSize int64
879 for _, entry := range entries {
880 x.AssertTrue(len(entry.Data) > 0)
881
882 // We use the size as a double check to ensure that we're
883 // working with the same proposal as before.
884 psz := entry.Size()
885 totalSize += int64(psz)
886
887 var proposal pb.Proposal
888 key := binary.BigEndian.Uint64(entry.Data[:8])
889 x.Check(proto.Unmarshal(entry.Data[8:], &proposal))
890 proposal.Index = entry.Index
891 updateStartTs(&proposal)
892
893 var perr error
894 p, ok := previous[key]
895 if ok && p.err == nil && p.size == psz {
896 msg := fmt.Sprintf("Proposal with key: %d already applied. Skipping index: %d."+
897 " Delta: %+v Snapshot: %+v.\n",
898 key, proposal.Index, proposal.Delta, proposal.Snapshot)
899 glog.Infof(msg)
900 previous[key].seen = time.Now() // Update the ts.
901 // Don't break here. We still need to call the Done below.
902
903 } else {
904 // if this applyCommitted fails, how do we ensure
905 start := time.Now()
906 perr = n.applyCommitted(&proposal, key)
907 if key != 0 {
908 p := &P{err: perr, size: psz, seen: time.Now()}
909 previous[key] = p
910 }
911 span := trace.SpanFromContext(n.ctx)
912 if perr != nil {
913 glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal)
914 span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal))
915 }
916 span.AddEvent("Applied proposal with key: %d, index: %d. Err: %v",
917 trace.WithAttributes(
918 attribute.Int64("key", int64(key)),
919 attribute.Int64("index", int64(proposal.Index)),
920 ))

Callers 1

InitAndStartNodeMethod · 0.95

Calls 12

applyCommittedMethod · 0.95
AssertTrueFunction · 0.92
CheckFunction · 0.92
SinceMsFunction · 0.92
updateStartTsFunction · 0.85
InfofMethod · 0.80
UpsertMethod · 0.80
RecordMethod · 0.80
WarningfMethod · 0.80
DoneMethod · 0.45
SizeMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected