startHeartbeat starts a go-routine for handling heartbeat ping messages sent over the given `hbSocket`. The `wg`'s `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel can be closed.
(hbSocket Socket, wg *sync.WaitGroup)
| 637 | // `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel |
| 638 | // can be closed. |
| 639 | func startHeartbeat(hbSocket Socket, wg *sync.WaitGroup) (shutdown chan struct{}) { |
| 640 | quit := make(chan struct{}) |
| 641 | |
| 642 | // Start the handler that will echo any received messages back to the sender. |
| 643 | wg.Add(1) |
| 644 | go func() { |
| 645 | defer wg.Done() |
| 646 | |
| 647 | type msgType struct { |
| 648 | Msg zmq4.Msg |
| 649 | Err error |
| 650 | } |
| 651 | |
| 652 | msgs := make(chan msgType) |
| 653 | |
| 654 | go func() { |
| 655 | defer close(msgs) |
| 656 | for { |
| 657 | msg, err := hbSocket.Socket.Recv() |
| 658 | select { |
| 659 | case msgs <- msgType{msg, err}: |
| 660 | case <-quit: |
| 661 | return |
| 662 | } |
| 663 | } |
| 664 | }() |
| 665 | |
| 666 | timeout := time.NewTimer(500 * time.Second) |
| 667 | defer timeout.Stop() |
| 668 | |
| 669 | for { |
| 670 | timeout.Reset(500 * time.Second) |
| 671 | select { |
| 672 | case <-quit: |
| 673 | return |
| 674 | case <-timeout.C: |
| 675 | continue |
| 676 | case v := <-msgs: |
| 677 | hbSocket.RunWithSocket(func(echo zmq4.Socket) error { |
| 678 | if v.Err != nil { |
| 679 | log.Fatalf("Error reading heartbeat ping bytes: %v\n", v.Err) |
| 680 | return v.Err |
| 681 | } |
| 682 | |
| 683 | // Send the received byte string back to let the front-end know that the kernel is alive. |
| 684 | if err := echo.Send(v.Msg); err != nil { |
| 685 | log.Printf("Error sending heartbeat pong bytes: %b\n", err) |
| 686 | return err |
| 687 | } |
| 688 | |
| 689 | return nil |
| 690 | }) |
| 691 | } |
| 692 | } |
| 693 | }() |
| 694 | |
| 695 | return quit |
| 696 | } |
no test coverage detected