(to uint64, s *stream)
| 396 | } |
| 397 | |
| 398 | func (n *Node) streamMessages(to uint64, s *stream) { |
| 399 | defer atomic.StoreInt32(&s.alive, 0) |
| 400 | |
| 401 | // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed. |
| 402 | // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from |
| 403 | // a partition and get a new leader. |
| 404 | deadline := time.Now().Add(10 * time.Second) |
| 405 | ticker := time.Tick(time.Second) |
| 406 | |
| 407 | var logged int |
| 408 | for range ticker { // Don't do this in a busy-wait loop, use a ticker. |
| 409 | // doSendMessage would block doing a stream. So, time.Now().After is |
| 410 | // only there to avoid a busy-wait. |
| 411 | if err := n.doSendMessage(to, s.msgCh); err != nil { |
| 412 | // Update lastLog so we print error only a few times if we are not able to connect. |
| 413 | // Otherwise, the log is polluted with repeated errors. |
| 414 | if logged == 0 { |
| 415 | glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err) |
| 416 | logged++ |
| 417 | } |
| 418 | } |
| 419 | if time.Now().After(deadline) { |
| 420 | return |
| 421 | } |
| 422 | } |
| 423 | } |
| 424 | |
| 425 | func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error { |
| 426 | addr, has := n.Peer(to) |
no test coverage detected