()
| 149 | } |
| 150 | |
| 151 | func (h *Hub) run() { |
| 152 | for { |
| 153 | select { |
| 154 | case join := <-h.join: |
| 155 | // Handle a subscription request: |
| 156 | // 1. Init topic |
| 157 | // 1.1 If a new topic is requested, create it |
| 158 | // 1.2 If a new subscription to an existing topic is requested: |
| 159 | // 1.2.1 check if topic is already loaded |
| 160 | // 1.2.2 if not, load it |
| 161 | // 1.2.3 if it cannot be loaded (not found), fail |
| 162 | // 2. Check access rights and reject, if appropriate |
| 163 | // 3. Attach session to the topic |
| 164 | // Is the topic already loaded? |
| 165 | t := h.topicGet(join.RcptTo) |
| 166 | if t == nil { |
| 167 | // Topic does not exist or not loaded. |
| 168 | t = &Topic{ |
| 169 | name: join.RcptTo, |
| 170 | xoriginal: join.Original, |
| 171 | // Indicates a proxy topic. |
| 172 | isProxy: globals.cluster.isRemoteTopic(join.RcptTo), |
| 173 | sessions: make(map[*Session]perSessionData), |
| 174 | clientMsg: make(chan *ClientComMessage, 192), |
| 175 | serverMsg: make(chan *ServerComMessage, 64), |
| 176 | reg: make(chan *ClientComMessage, 256), |
| 177 | unreg: make(chan *ClientComMessage, 256), |
| 178 | meta: make(chan *ClientComMessage, 64), |
| 179 | perUser: make(map[types.Uid]perUserData), |
| 180 | exit: make(chan *shutDown, 1), |
| 181 | } |
| 182 | if globals.cluster != nil { |
| 183 | if t.isProxy { |
| 184 | t.proxy = make(chan *ClusterResp, 128) |
| 185 | t.masterNode = globals.cluster.ring.Get(t.name) |
| 186 | } else { |
| 187 | // It's a master topic. Make a channel for handling |
| 188 | // direct messages from the proxy. |
| 189 | t.master = make(chan *ClusterSessUpdate, 8) |
| 190 | } |
| 191 | } |
| 192 | // Topic is created in suspended state because it's not yet configured. |
| 193 | t.markPaused(true) |
| 194 | // Save topic now to prevent race condition. |
| 195 | h.topicPut(join.RcptTo, t) |
| 196 | |
| 197 | // Configure the topic. |
| 198 | go topicInit(t, join, h) |
| 199 | } else { |
| 200 | // Topic found. |
| 201 | if t.isInactive() { |
| 202 | // Topic is either not ready or being deleted. |
| 203 | if join.sess.inflightReqs != nil { |
| 204 | join.sess.inflightReqs.Done() |
| 205 | } |
| 206 | join.sess.queueOut(ErrLockedReply(join, join.Timestamp)) |
| 207 | continue |
| 208 | } |
no test coverage detected