Equivalent of starting a new session and a read loop in one.
(stream pbx.Node_MessageLoopServer)
| 36 | |
| 37 | // Equivalent of starting a new session and a read loop in one. |
| 38 | func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error { |
| 39 | sess, count := globals.sessionStore.NewSession(stream, "") |
| 40 | if p, ok := peer.FromContext(stream.Context()); ok { |
| 41 | sess.remoteAddr = p.Addr.String() |
| 42 | } |
| 43 | logs.Info.Println("grpc: session started", sess.sid, sess.remoteAddr, count) |
| 44 | |
| 45 | defer func() { |
| 46 | sess.closeGrpc() |
| 47 | sess.cleanUp(false) |
| 48 | }() |
| 49 | |
| 50 | go sess.writeGrpcLoop() |
| 51 | |
| 52 | for { |
| 53 | in, err := stream.Recv() |
| 54 | if err == io.EOF { |
| 55 | return nil |
| 56 | } |
| 57 | if err != nil { |
| 58 | logs.Err.Println("grpc: recv", sess.sid, err) |
| 59 | return err |
| 60 | } |
| 61 | logs.Info.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid) |
| 62 | statsInc("IncomingMessagesGrpcTotal", 1) |
| 63 | sess.dispatch(pbCliDeserialize(in)) |
| 64 | |
| 65 | sess.lock.Lock() |
| 66 | if sess.grpcnode == nil { |
| 67 | sess.lock.Unlock() |
| 68 | break |
| 69 | } |
| 70 | sess.lock.Unlock() |
| 71 | } |
| 72 | |
| 73 | return nil |
| 74 | } |
| 75 | |
| 76 | func (sess *Session) sendMessageGrpc(msg any) bool { |
| 77 | if len(sess.send) > sendQueueLimit { |
nothing calls this directly
no test coverage detected