MCPcopy Index your code
hub / github.com/tinode/chat / clusterWriteLoop

Method clusterWriteLoop

server/cluster.go:1199–1265  ·  view source on GitHub ↗

clusterWriteLoop implements write loop for multiplexing (proxy) session at a node which hosts master topic. The session is a multiplexing session, i.e. it handles requests for multiple sessions at origin.

(forTopic string)

Source from the content-addressed store, hash-verified

1197// clusterWriteLoop implements write loop for multiplexing (proxy) session at a node which hosts master topic.
1198// The session is a multiplexing session, i.e. it handles requests for multiple sessions at origin.
1199func (sess *Session) clusterWriteLoop(forTopic string) {
1200 terminate := true
1201 defer func() {
1202 if terminate {
1203 sess.closeRPC()
1204 globals.sessionStore.Delete(sess)
1205 sess.inflightReqs = nil
1206 sess.unsubAll()
1207 }
1208 }()
1209
1210 for {
1211 select {
1212 case msg, ok := <-sess.send:
1213 if !ok || sess.clnode.endpoint == nil {
1214 // channel closed
1215 return
1216 }
1217 srvMsg := msg.(*ServerComMessage)
1218 response := &ClusterResp{SrvMsg: srvMsg}
1219 if srvMsg.sess == nil {
1220 response.OrigSid = "*"
1221 } else {
1222 response.OrigReqType = srvMsg.sess.proxyReq
1223 response.OrigSid = srvMsg.sess.sid
1224 srvMsg.AsUser = srvMsg.sess.uid.UserId()
1225
1226 switch srvMsg.sess.proxyReq {
1227 case ProxyReqJoin, ProxyReqLeave, ProxyReqMeta, ProxyReqBgSession, ProxyReqMeUserAgent, ProxyReqCall:
1228 // Do nothing
1229 case ProxyReqBroadcast, ProxyReqNone:
1230 if srvMsg.Data != nil || srvMsg.Pres != nil || srvMsg.Info != nil {
1231 response.OrigSid = "*"
1232 } else if srvMsg.Ctrl == nil {
1233 logs.Warn.Println("cluster: request type not set in clusterWriteLoop", sess.sid,
1234 srvMsg.describe(), "src_sid:", srvMsg.sess.sid)
1235 }
1236 default:
1237 logs.Err.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq)
1238 }
1239 }
1240
1241 srvMsg.RcptTo = forTopic
1242 response.RcptTo = forTopic
1243
1244 if err := sess.clnode.masterToProxyAsync(response); err != nil {
1245 logs.Warn.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error())
1246 return
1247 }
1248 case msg := <-sess.stop:
1249 if msg == nil {
1250 // Terminating multiplexing session.
1251 return
1252 }
1253 // There are two cases of msg != nil:
1254 // * user is being deleted
1255 // * node shutdown
1256 // In both cases the msg does not need to be forwarded to the proxy.

Callers 1

Calls 8

closeRPCMethod · 0.95
unsubAllMethod · 0.95
UserIdMethod · 0.80
PrintlnMethod · 0.80
masterToProxyAsyncMethod · 0.80
ErrorMethod · 0.80
DeleteMethod · 0.65
describeMethod · 0.45

Tested by

no test coverage detected