(to uint64, msgCh chan []byte)
| 423 | } |
| 424 | |
| 425 | func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error { |
| 426 | addr, has := n.Peer(to) |
| 427 | if !has { |
| 428 | return errors.Errorf("Do not have address of peer %#x", to) |
| 429 | } |
| 430 | pool, err := GetPools().Get(addr) |
| 431 | if err != nil { |
| 432 | return err |
| 433 | } |
| 434 | |
| 435 | c := pb.NewRaftClient(pool.Get()) |
| 436 | ctx, span := otel.Tracer("").Start(context.Background(), |
| 437 | fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to)) |
| 438 | defer span.End() |
| 439 | |
| 440 | mc, err := c.RaftMessage(ctx) |
| 441 | if err != nil { |
| 442 | return err |
| 443 | } |
| 444 | |
| 445 | var packets, lastPackets uint64 |
| 446 | slurp := func(batch *pb.RaftBatch) { |
| 447 | for { |
| 448 | if len(batch.Payload.Data) > messageBatchSoftLimit { |
| 449 | return |
| 450 | } |
| 451 | select { |
| 452 | case data := <-msgCh: |
| 453 | batch.Payload.Data = append(batch.Payload.Data, data...) |
| 454 | packets++ |
| 455 | default: |
| 456 | return |
| 457 | } |
| 458 | } |
| 459 | } |
| 460 | |
| 461 | ctx = mc.Context() |
| 462 | |
| 463 | fastTick := time.Tick(5 * time.Second) |
| 464 | ticker := time.Tick(3 * time.Minute) |
| 465 | |
| 466 | for { |
| 467 | select { |
| 468 | case data := <-msgCh: |
| 469 | batch := &pb.RaftBatch{ |
| 470 | Context: n.RaftContext, |
| 471 | Payload: &api.Payload{Data: data}, |
| 472 | } |
| 473 | slurp(batch) // Pick up more entries from msgCh, if present. |
| 474 | span.AddEvent(fmt.Sprintf("[to: %x] [Packets: %d] Sending data of length: %d.", |
| 475 | to, packets, len(batch.Payload.Data))) |
| 476 | if packets%10000 == 0 { |
| 477 | glog.V(2).Infof("[to: %x] [Packets: %d] Sending data of length: %d.", |
| 478 | to, packets, len(batch.Payload.Data)) |
| 479 | } |
| 480 | packets++ |
| 481 | if err := mc.Send(batch); err != nil { |
| 482 | span.AddEvent(fmt.Sprintf("Error while mc.Send: %v", err)) |
no test coverage detected