MCPcopy
hub / github.com/tinode/chat / runProxy

Method runProxy

server/topic_proxy.go:17–117  ·  view source on GitHub ↗
(hub *Hub)

Source from the content-addressed store, hash-verified

15)
16
17func (t *Topic) runProxy(hub *Hub) {
18 killTimer := time.NewTimer(time.Hour)
19 killTimer.Stop()
20
21 for {
22 select {
23 case msg := <-t.reg:
24 // Request to add a connection to this topic
25 if t.isInactive() {
26 msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow()))
27 } else if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, msg, t.name, msg.sess); err != nil {
28 // Response (ctrl message) will be handled when it's received via the proxy channel.
29 logs.Warn.Printf("proxy topic[%s]: route join request from proxy to master failed - %s", t.name, err)
30 msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
31 }
32 if msg.sess.inflightReqs != nil {
33 msg.sess.inflightReqs.Done()
34 }
35
36 case msg := <-t.unreg:
37 if !t.handleProxyLeaveRequest(msg, killTimer) {
38 sid := "nil"
39 if msg.sess != nil {
40 sid = msg.sess.sid
41 }
42 logs.Warn.Printf("proxy topic[%s]: failed to update proxy topic state for leave request - sid %s", t.name, sid)
43 msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
44 }
45 if msg.init && msg.sess.inflightReqs != nil {
46 // If it's a client initiated request.
47 msg.sess.inflightReqs.Done()
48 }
49
50 case msg := <-t.clientMsg:
51 // Content message intended for broadcasting to recipients
52 if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil {
53 logs.Warn.Printf("topic proxy[%s]: route broadcast request from proxy to master failed - %s", t.name, err)
54 msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
55 }
56
57 case msg := <-t.serverMsg:
58 if msg.Info != nil || msg.Pres != nil {
59 globals.cluster.routeToTopicIntraCluster(t.name, msg, msg.sess)
60 } else {
61 // FIXME: should something be done here?
62 logs.Err.Printf("ERROR!!! topic proxy[%s]: unexpected server-side message in proxy topic %s", t.name, msg.describe())
63 }
64
65 case msg := <-t.meta:
66 // Request to get/set topic metadata
67 if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, msg, t.name, msg.sess); err != nil {
68 logs.Warn.Printf("proxy topic[%s]: route meta request from proxy to master failed - %s", t.name, err)
69 msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow()))
70 }
71
72 case upd := <-t.supd:
73 // Either an update to 'me' user agent from one of the sessions or
74 // background session comes to foreground.

Callers 1

runMethod · 0.95

Calls 14

isInactiveMethod · 0.95
proxyMasterResponseMethod · 0.95
TimeNowFunction · 0.92
ErrLockedReplyFunction · 0.85
queueOutMethod · 0.80
routeToTopicMasterMethod · 0.80
DoneMethod · 0.80
detachSessionMethod · 0.80
topicProxyGoneMethod · 0.80

Tested by

no test coverage detected