proxyMasterResponse at proxy topic processes a master topic response to an earlier request.
(msg *ClusterResp, killTimer *time.Timer)
| 174 | |
| 175 | // proxyMasterResponse at proxy topic processes a master topic response to an earlier request. |
| 176 | func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) { |
| 177 | // Kills topic after a period of inactivity. |
| 178 | keepAlive := idleProxyTopicTimeout |
| 179 | |
| 180 | if msg.SrvMsg.Pres != nil && msg.SrvMsg.Pres.What == "acs" && msg.SrvMsg.Pres.Acs != nil { |
| 181 | // If the server changed acs on this topic, update the internal state. |
| 182 | t.updateAcsFromPresMsg(msg.SrvMsg.Pres) |
| 183 | } |
| 184 | |
| 185 | if msg.OrigSid == "*" { |
| 186 | // It is a broadcast. |
| 187 | switch { |
| 188 | case msg.SrvMsg.Pres != nil || msg.SrvMsg.Data != nil || msg.SrvMsg.Info != nil: |
| 189 | // Regular broadcast. |
| 190 | t.handleProxyBroadcast(msg.SrvMsg) |
| 191 | case msg.SrvMsg.Ctrl != nil: |
| 192 | // Ctrl broadcast. E.g. for user eviction. |
| 193 | t.proxyCtrlBroadcast(msg.SrvMsg) |
| 194 | default: |
| 195 | } |
| 196 | } else { |
| 197 | sess := globals.sessionStore.Get(msg.OrigSid) |
| 198 | if sess == nil { |
| 199 | logs.Warn.Printf("proxy topic[%s]: session %s not found; already terminated?", t.name, msg.OrigSid) |
| 200 | } |
| 201 | switch msg.OrigReqType { |
| 202 | case ProxyReqJoin: |
| 203 | if sess != nil && msg.SrvMsg.Ctrl != nil { |
| 204 | // TODO: do we need to let the master topic know that the subscription is not longer valid |
| 205 | // or is it already informed by the session when it terminated? |
| 206 | |
| 207 | // Subscription result. |
| 208 | if msg.SrvMsg.Ctrl.Code < 300 { |
| 209 | sess.sessionStoreLock.Lock() |
| 210 | // Make sure the session isn't gone yet. |
| 211 | if session := globals.sessionStore.Get(msg.OrigSid); session != nil { |
| 212 | // Successful subscriptions. |
| 213 | t.addSession(session, msg.SrvMsg.uid, types.IsChannel(msg.SrvMsg.Ctrl.Topic)) |
| 214 | session.addSub(t.name, &Subscription{ |
| 215 | broadcast: t.clientMsg, |
| 216 | done: t.unreg, |
| 217 | meta: t.meta, |
| 218 | supd: t.supd, |
| 219 | }) |
| 220 | } |
| 221 | sess.sessionStoreLock.Unlock() |
| 222 | |
| 223 | killTimer.Stop() |
| 224 | } else if len(t.sessions) == 0 { |
| 225 | killTimer.Reset(keepAlive) |
| 226 | } |
| 227 | } |
| 228 | case ProxyReqBroadcast, ProxyReqMeta, ProxyReqCall: |
| 229 | // no processing |
| 230 | case ProxyReqLeave: |
| 231 | if msg.SrvMsg != nil && msg.SrvMsg.Ctrl != nil { |
| 232 | if msg.SrvMsg.Ctrl.Code < 300 { |
| 233 | if sess != nil { |
no test coverage detected