(hub *Hub)
| 15 | ) |
| 16 | |
| 17 | func (t *Topic) runProxy(hub *Hub) { |
| 18 | killTimer := time.NewTimer(time.Hour) |
| 19 | killTimer.Stop() |
| 20 | |
| 21 | for { |
| 22 | select { |
| 23 | case msg := <-t.reg: |
| 24 | // Request to add a connection to this topic |
| 25 | if t.isInactive() { |
| 26 | msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow())) |
| 27 | } else if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, msg, t.name, msg.sess); err != nil { |
| 28 | // Response (ctrl message) will be handled when it's received via the proxy channel. |
| 29 | logs.Warn.Printf("proxy topic[%s]: route join request from proxy to master failed - %s", t.name, err) |
| 30 | msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow())) |
| 31 | } |
| 32 | if msg.sess.inflightReqs != nil { |
| 33 | msg.sess.inflightReqs.Done() |
| 34 | } |
| 35 | |
| 36 | case msg := <-t.unreg: |
| 37 | if !t.handleProxyLeaveRequest(msg, killTimer) { |
| 38 | sid := "nil" |
| 39 | if msg.sess != nil { |
| 40 | sid = msg.sess.sid |
| 41 | } |
| 42 | logs.Warn.Printf("proxy topic[%s]: failed to update proxy topic state for leave request - sid %s", t.name, sid) |
| 43 | msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow())) |
| 44 | } |
| 45 | if msg.init && msg.sess.inflightReqs != nil { |
| 46 | // If it's a client initiated request. |
| 47 | msg.sess.inflightReqs.Done() |
| 48 | } |
| 49 | |
| 50 | case msg := <-t.clientMsg: |
| 51 | // Content message intended for broadcasting to recipients |
| 52 | if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil { |
| 53 | logs.Warn.Printf("topic proxy[%s]: route broadcast request from proxy to master failed - %s", t.name, err) |
| 54 | msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow())) |
| 55 | } |
| 56 | |
| 57 | case msg := <-t.serverMsg: |
| 58 | if msg.Info != nil || msg.Pres != nil { |
| 59 | globals.cluster.routeToTopicIntraCluster(t.name, msg, msg.sess) |
| 60 | } else { |
| 61 | // FIXME: should something be done here? |
| 62 | logs.Err.Printf("ERROR!!! topic proxy[%s]: unexpected server-side message in proxy topic %s", t.name, msg.describe()) |
| 63 | } |
| 64 | |
| 65 | case msg := <-t.meta: |
| 66 | // Request to get/set topic metadata |
| 67 | if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, msg, t.name, msg.sess); err != nil { |
| 68 | logs.Warn.Printf("proxy topic[%s]: route meta request from proxy to master failed - %s", t.name, err) |
| 69 | msg.sess.queueOut(ErrClusterUnreachableReply(msg, types.TimeNow())) |
| 70 | } |
| 71 | |
| 72 | case upd := <-t.supd: |
| 73 | // Either an update to 'me' user agent from one of the sessions or |
| 74 | // background session comes to foreground. |
no test coverage detected