MCPcopy Index your code
hub / github.com/gopherdata/gophernotes / startHeartbeat

Function startHeartbeat

kernel.go:639–696  ·  view source on GitHub ↗

startHeartbeat starts a go-routine for handling heartbeat ping messages sent over the given `hbSocket`. The `wg`'s `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel can be closed.

(hbSocket Socket, wg *sync.WaitGroup)

Source from the content-addressed store, hash-verified

637// `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel
638// can be closed.
639func startHeartbeat(hbSocket Socket, wg *sync.WaitGroup) (shutdown chan struct{}) {
640 quit := make(chan struct{})
641
642 // Start the handler that will echo any received messages back to the sender.
643 wg.Add(1)
644 go func() {
645 defer wg.Done()
646
647 type msgType struct {
648 Msg zmq4.Msg
649 Err error
650 }
651
652 msgs := make(chan msgType)
653
654 go func() {
655 defer close(msgs)
656 for {
657 msg, err := hbSocket.Socket.Recv()
658 select {
659 case msgs <- msgType{msg, err}:
660 case <-quit:
661 return
662 }
663 }
664 }()
665
666 timeout := time.NewTimer(500 * time.Second)
667 defer timeout.Stop()
668
669 for {
670 timeout.Reset(500 * time.Second)
671 select {
672 case <-quit:
673 return
674 case <-timeout.C:
675 continue
676 case v := <-msgs:
677 hbSocket.RunWithSocket(func(echo zmq4.Socket) error {
678 if v.Err != nil {
679 log.Fatalf("Error reading heartbeat ping bytes: %v\n", v.Err)
680 return v.Err
681 }
682
683 // Send the received byte string back to let the front-end know that the kernel is alive.
684 if err := echo.Send(v.Msg); err != nil {
685 log.Printf("Error sending heartbeat pong bytes: %b\n", err)
686 return err
687 }
688
689 return nil
690 })
691 }
692 }
693 }()
694
695 return quit
696}

Callers 1

runKernelFunction · 0.85

Calls 1

RunWithSocketMethod · 0.80

Tested by

no test coverage detected