Send sends the given RAFT message from this node.
(msg *raftpb.Message)
| 237 | |
| 238 | // Send sends the given RAFT message from this node. |
| 239 | func (n *Node) Send(msg *raftpb.Message) { |
| 240 | x.AssertTruef(n.Id != msg.To, "Sending message to itself") |
| 241 | data, err := msg.Marshal() |
| 242 | x.Check(err) |
| 243 | |
| 244 | if glog.V(2) { |
| 245 | switch msg.Type { |
| 246 | case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: |
| 247 | atomic.AddInt64(&n.heartbeatsOut, 1) |
| 248 | case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: |
| 249 | case raftpb.MsgApp, raftpb.MsgAppResp: |
| 250 | case raftpb.MsgProp: |
| 251 | default: |
| 252 | glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To) |
| 253 | } |
| 254 | } |
| 255 | // As long as leadership is stable, any attempted Propose() calls should be reflected in the |
| 256 | // next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send |
| 257 | // MsgProp to the leader. It is up to the transport layer to get those messages to their |
| 258 | // destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft |
| 259 | // (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During |
| 260 | // leadership transitions, proposals may get dropped even if the network is reliable. |
| 261 | // |
| 262 | // We can't do a select default here. The messages must be sent to the channel, otherwise we |
| 263 | // should block until the channel can accept these messages. BatchAndSendMessages would take |
| 264 | // care of dropping messages which can't be sent due to network issues to the corresponding |
| 265 | // node. But, we shouldn't take the liberty to do that here. It would take us more time to |
| 266 | // repropose these dropped messages anyway, than to block here a bit waiting for the messages |
| 267 | // channel to clear out. |
| 268 | n.messages <- sendmsg{to: msg.To, data: data} |
| 269 | } |
| 270 | |
| 271 | // Snapshot returns the current snapshot. |
| 272 | func (n *Node) Snapshot() (raftpb.Snapshot, error) { |
nothing calls this directly
no test coverage detected