MCPcopy
hub / github.com/dgraph-io/dgraph / doSendMessage

Method doSendMessage

conn/node.go:425–522  ·  view source on GitHub ↗
(to uint64, msgCh chan []byte)

Source from the content-addressed store, hash-verified

423}
424
425func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
426 addr, has := n.Peer(to)
427 if !has {
428 return errors.Errorf("Do not have address of peer %#x", to)
429 }
430 pool, err := GetPools().Get(addr)
431 if err != nil {
432 return err
433 }
434
435 c := pb.NewRaftClient(pool.Get())
436 ctx, span := otel.Tracer("").Start(context.Background(),
437 fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
438 defer span.End()
439
440 mc, err := c.RaftMessage(ctx)
441 if err != nil {
442 return err
443 }
444
445 var packets, lastPackets uint64
446 slurp := func(batch *pb.RaftBatch) {
447 for {
448 if len(batch.Payload.Data) > messageBatchSoftLimit {
449 return
450 }
451 select {
452 case data := <-msgCh:
453 batch.Payload.Data = append(batch.Payload.Data, data...)
454 packets++
455 default:
456 return
457 }
458 }
459 }
460
461 ctx = mc.Context()
462
463 fastTick := time.Tick(5 * time.Second)
464 ticker := time.Tick(3 * time.Minute)
465
466 for {
467 select {
468 case data := <-msgCh:
469 batch := &pb.RaftBatch{
470 Context: n.RaftContext,
471 Payload: &api.Payload{Data: data},
472 }
473 slurp(batch) // Pick up more entries from msgCh, if present.
474 span.AddEvent(fmt.Sprintf("[to: %x] [Packets: %d] Sending data of length: %d.",
475 to, packets, len(batch.Payload.Data)))
476 if packets%10000 == 0 {
477 glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.",
478 to, packets, len(batch.Payload.Data))
479 }
480 packets++
481 if err := mc.Send(batch); err != nil {
482 span.AddEvent(fmt.Sprintf("Error while mc.Send: %v", err))

Callers 1

streamMessagesMethod · 0.95

Calls 15

PeerMethod · 0.95
RaftMessageMethod · 0.95
RaftMethod · 0.95
IsPeerMethod · 0.95
NewRaftClientFunction · 0.92
GetPoolsFunction · 0.85
InfofMethod · 0.80
WarningfMethod · 0.80
SetUnhealthyMethod · 0.80
GetMethod · 0.65
StartMethod · 0.65
SendMethod · 0.65

Tested by

no test coverage detected