MCPcopy
hub / github.com/dgraph-io/dgraph / RaftMessage

Method RaftMessage

conn/raft_server.go:182–264  ·  view source on GitHub ↗

RaftMessage handles RAFT messages.

(server pb.Raft_RaftMessageServer)

Source from the content-addressed store, hash-verified

180
181// RaftMessage handles RAFT messages.
182func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
183 ctx := server.Context()
184 if ctx.Err() != nil {
185 return ctx.Err()
186 }
187 span := trace.SpanFromContext(ctx)
188
189 node := w.GetNode()
190 if node == nil || node.Raft() == nil {
191 return ErrNoNode
192 }
193 span.AddEvent(fmt.Sprintf("Stream server is node %d", node.Id))
194
195 var rc *pb.RaftContext
196 raft := node.Raft()
197 step := func(data []byte) error {
198 ctx, cancel := context.WithTimeout(ctx, time.Minute)
199 defer cancel()
200
201 for idx := 0; idx < len(data); {
202 x.AssertTruef(len(data[idx:]) >= 4,
203 "Slice left of size: %v. Expected at least 4.", len(data[idx:]))
204
205 sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
206 idx += 4
207 msg := raftpb.Message{}
208 if idx+sz > len(data) {
209 return errors.Errorf(
210 "Invalid query. Specified size %v overflows slice [%v,%v)\n",
211 sz, idx, len(data))
212 }
213 if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
214 x.Check(err)
215 }
216 // This should be done in order, and not via a goroutine.
217 // Step can block forever. See: https://github.com/etcd-io/etcd/issues/10585
218 // So, add a context with timeout to allow it to get out of the blockage.
219 if glog.V(2) {
220 switch msg.Type {
221 case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
222 atomic.AddInt64(&node.heartbeatsIn, 1)
223 case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
224 case raftpb.MsgApp, raftpb.MsgAppResp:
225 case raftpb.MsgProp:
226 default:
227 glog.Infof("RaftComm: [%#x] Received msg of type: %s from %#x",
228 msg.To, msg.Type, msg.From)
229 }
230 }
231 if err := raft.Step(ctx, msg); err != nil {
232 glog.Warningf("Error while raft.Step from %#x: %v. Closing RaftMessage stream.",
233 rc.GetId(), err)
234 return errors.Wrapf(err, "error while raft.Step from %#x", rc.GetId())
235 }
236 idx += sz
237 }
238 return nil
239 }

Callers

nothing calls this directly

Implementers 1

UnimplementedRaftServerprotos/pb/pb_grpc.pb.go

Calls 12

GetNodeMethod · 0.95
GetIdMethod · 0.95
AssertTruefFunction · 0.92
CheckFunction · 0.92
RaftMethod · 0.80
InfofMethod · 0.80
StepMethod · 0.80
WarningfMethod · 0.80
RecvMethod · 0.65
ConnectMethod · 0.65
ErrorfMethod · 0.45
GetContextMethod · 0.45

Tested by

no test coverage detected