MCPcopy
hub / github.com/dgraph-io/dgraph / streamMessages

Method streamMessages

conn/node.go:398–423  ·  view source on GitHub ↗
(to uint64, s *stream)

Source from the content-addressed store, hash-verified

396}
397
398func (n *Node) streamMessages(to uint64, s *stream) {
399 defer atomic.StoreInt32(&s.alive, 0)
400
401 // Exit after this deadline. Let BatchAndSendMessages create another goroutine, if needed.
402 // Let's set the deadline to 10s because if we increase it, then it takes longer to recover from
403 // a partition and get a new leader.
404 deadline := time.Now().Add(10 * time.Second)
405 ticker := time.Tick(time.Second)
406
407 var logged int
408 for range ticker { // Don't do this in a busy-wait loop, use a ticker.
409 // doSendMessage would block doing a stream. So, time.Now().After is
410 // only there to avoid a busy-wait.
411 if err := n.doSendMessage(to, s.msgCh); err != nil {
412 // Update lastLog so we print error only a few times if we are not able to connect.
413 // Otherwise, the log is polluted with repeated errors.
414 if logged == 0 {
415 glog.Warningf("Unable to send message to peer: %#x. Error: %v", to, err)
416 logged++
417 }
418 }
419 if time.Now().After(deadline) {
420 return
421 }
422 }
423}
424
425func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
426 addr, has := n.Peer(to)

Callers 1

BatchAndSendMessagesMethod · 0.95

Calls 3

doSendMessageMethod · 0.95
WarningfMethod · 0.80
AddMethod · 0.45

Tested by

no test coverage detected