MCPcopy
hub / github.com/tinode/chat / MessageLoop

Method MessageLoop

server/hdl_grpc.go:38–74  ·  view source on GitHub ↗

Equivalent of starting a new session and a read loop in one.

(stream pbx.Node_MessageLoopServer)

Source from the content-addressed store, hash-verified

36
37// Equivalent of starting a new session and a read loop in one.
38func (*grpcNodeServer) MessageLoop(stream pbx.Node_MessageLoopServer) error {
39 sess, count := globals.sessionStore.NewSession(stream, "")
40 if p, ok := peer.FromContext(stream.Context()); ok {
41 sess.remoteAddr = p.Addr.String()
42 }
43 logs.Info.Println("grpc: session started", sess.sid, sess.remoteAddr, count)
44
45 defer func() {
46 sess.closeGrpc()
47 sess.cleanUp(false)
48 }()
49
50 go sess.writeGrpcLoop()
51
52 for {
53 in, err := stream.Recv()
54 if err == io.EOF {
55 return nil
56 }
57 if err != nil {
58 logs.Err.Println("grpc: recv", sess.sid, err)
59 return err
60 }
61 logs.Info.Println("grpc in:", truncateStringIfTooLong(in.String()), sess.sid)
62 statsInc("IncomingMessagesGrpcTotal", 1)
63 sess.dispatch(pbCliDeserialize(in))
64
65 sess.lock.Lock()
66 if sess.grpcnode == nil {
67 sess.lock.Unlock()
68 break
69 }
70 sess.lock.Unlock()
71 }
72
73 return nil
74}
75
76func (sess *Session) sendMessageGrpc(msg any) bool {
77 if len(sess.send) > sendQueueLimit {

Callers

nothing calls this directly

Calls 13

closeGrpcMethod · 0.95
cleanUpMethod · 0.95
writeGrpcLoopMethod · 0.95
dispatchMethod · 0.95
truncateStringIfTooLongFunction · 0.85
statsIncFunction · 0.85
pbCliDeserializeFunction · 0.85
NewSessionMethod · 0.80
PrintlnMethod · 0.80
LockMethod · 0.80
UnlockMethod · 0.80
RecvMethod · 0.65

Tested by

no test coverage detected