()
| 861 | } |
| 862 | |
| 863 | func (n *node) processApplyCh() { |
| 864 | defer n.closer.Done() // CLOSER:1 |
| 865 | |
| 866 | type P struct { |
| 867 | err error |
| 868 | size int |
| 869 | seen time.Time |
| 870 | } |
| 871 | previous := make(map[uint64]*P) |
| 872 | |
| 873 | // This function must be run serially. |
| 874 | handle := func(entries []raftpb.Entry) { |
| 875 | glog.V(3).Infof("handling element in applyCh with #entries %v", len(entries)) |
| 876 | defer glog.V(3).Infof("done handling element in applyCh") |
| 877 | |
| 878 | var totalSize int64 |
| 879 | for _, entry := range entries { |
| 880 | x.AssertTrue(len(entry.Data) > 0) |
| 881 | |
| 882 | // We use the size as a double check to ensure that we're |
| 883 | // working with the same proposal as before. |
| 884 | psz := entry.Size() |
| 885 | totalSize += int64(psz) |
| 886 | |
| 887 | var proposal pb.Proposal |
| 888 | key := binary.BigEndian.Uint64(entry.Data[:8]) |
| 889 | x.Check(proto.Unmarshal(entry.Data[8:], &proposal)) |
| 890 | proposal.Index = entry.Index |
| 891 | updateStartTs(&proposal) |
| 892 | |
| 893 | var perr error |
| 894 | p, ok := previous[key] |
| 895 | if ok && p.err == nil && p.size == psz { |
| 896 | msg := fmt.Sprintf("Proposal with key: %d already applied. Skipping index: %d."+ |
| 897 | " Delta: %+v Snapshot: %+v.\n", |
| 898 | key, proposal.Index, proposal.Delta, proposal.Snapshot) |
| 899 | glog.Infof(msg) |
| 900 | previous[key].seen = time.Now() // Update the ts. |
| 901 | // Don't break here. We still need to call the Done below. |
| 902 | |
| 903 | } else { |
| 904 | // if this applyCommitted fails, how do we ensure |
| 905 | start := time.Now() |
| 906 | perr = n.applyCommitted(&proposal, key) |
| 907 | if key != 0 { |
| 908 | p := &P{err: perr, size: psz, seen: time.Now()} |
| 909 | previous[key] = p |
| 910 | } |
| 911 | span := trace.SpanFromContext(n.ctx) |
| 912 | if perr != nil { |
| 913 | glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal) |
| 914 | span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal)) |
| 915 | } |
| 916 | span.AddEvent("Applied proposal with key: %d, index: %d. Err: %v", |
| 917 | trace.WithAttributes( |
| 918 | attribute.Int64("key", int64(key)), |
| 919 | attribute.Int64("index", int64(proposal.Index)), |
| 920 | )) |
no test coverage detected