RaftMessage handles RAFT messages.
(server pb.Raft_RaftMessageServer)
| 180 | |
| 181 | // RaftMessage handles RAFT messages. |
| 182 | func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { |
| 183 | ctx := server.Context() |
| 184 | if ctx.Err() != nil { |
| 185 | return ctx.Err() |
| 186 | } |
| 187 | span := trace.SpanFromContext(ctx) |
| 188 | |
| 189 | node := w.GetNode() |
| 190 | if node == nil || node.Raft() == nil { |
| 191 | return ErrNoNode |
| 192 | } |
| 193 | span.AddEvent(fmt.Sprintf("Stream server is node %d", node.Id)) |
| 194 | |
| 195 | var rc *pb.RaftContext |
| 196 | raft := node.Raft() |
| 197 | step := func(data []byte) error { |
| 198 | ctx, cancel := context.WithTimeout(ctx, time.Minute) |
| 199 | defer cancel() |
| 200 | |
| 201 | for idx := 0; idx < len(data); { |
| 202 | x.AssertTruef(len(data[idx:]) >= 4, |
| 203 | "Slice left of size: %v. Expected at least 4.", len(data[idx:])) |
| 204 | |
| 205 | sz := int(binary.LittleEndian.Uint32(data[idx : idx+4])) |
| 206 | idx += 4 |
| 207 | msg := raftpb.Message{} |
| 208 | if idx+sz > len(data) { |
| 209 | return errors.Errorf( |
| 210 | "Invalid query. Specified size %v overflows slice [%v,%v)\n", |
| 211 | sz, idx, len(data)) |
| 212 | } |
| 213 | if err := msg.Unmarshal(data[idx : idx+sz]); err != nil { |
| 214 | x.Check(err) |
| 215 | } |
| 216 | // This should be done in order, and not via a goroutine. |
| 217 | // Step can block forever. See: https://github.com/etcd-io/etcd/issues/10585 |
| 218 | // So, add a context with timeout to allow it to get out of the blockage. |
| 219 | if glog.V(2) { |
| 220 | switch msg.Type { |
| 221 | case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: |
| 222 | atomic.AddInt64(&node.heartbeatsIn, 1) |
| 223 | case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: |
| 224 | case raftpb.MsgApp, raftpb.MsgAppResp: |
| 225 | case raftpb.MsgProp: |
| 226 | default: |
| 227 | glog.Infof("RaftComm: [%#x] Received msg of type: %s from %#x", |
| 228 | msg.To, msg.Type, msg.From) |
| 229 | } |
| 230 | } |
| 231 | if err := raft.Step(ctx, msg); err != nil { |
| 232 | glog.Warningf("Error while raft.Step from %#x: %v. Closing RaftMessage stream.", |
| 233 | rc.GetId(), err) |
| 234 | return errors.Wrapf(err, "error while raft.Step from %#x", rc.GetId()) |
| 235 | } |
| 236 | idx += sz |
| 237 | } |
| 238 | return nil |
| 239 | } |
nothing calls this directly
no test coverage detected