MCPcopy
hub / github.com/tinode/chat / run

Method run

server/hub.go:151–344  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

149}
150
151func (h *Hub) run() {
152 for {
153 select {
154 case join := <-h.join:
155 // Handle a subscription request:
156 // 1. Init topic
157 // 1.1 If a new topic is requested, create it
158 // 1.2 If a new subscription to an existing topic is requested:
159 // 1.2.1 check if topic is already loaded
160 // 1.2.2 if not, load it
161 // 1.2.3 if it cannot be loaded (not found), fail
162 // 2. Check access rights and reject, if appropriate
163 // 3. Attach session to the topic
164 // Is the topic already loaded?
165 t := h.topicGet(join.RcptTo)
166 if t == nil {
167 // Topic does not exist or not loaded.
168 t = &Topic{
169 name: join.RcptTo,
170 xoriginal: join.Original,
171 // Indicates a proxy topic.
172 isProxy: globals.cluster.isRemoteTopic(join.RcptTo),
173 sessions: make(map[*Session]perSessionData),
174 clientMsg: make(chan *ClientComMessage, 192),
175 serverMsg: make(chan *ServerComMessage, 64),
176 reg: make(chan *ClientComMessage, 256),
177 unreg: make(chan *ClientComMessage, 256),
178 meta: make(chan *ClientComMessage, 64),
179 perUser: make(map[types.Uid]perUserData),
180 exit: make(chan *shutDown, 1),
181 }
182 if globals.cluster != nil {
183 if t.isProxy {
184 t.proxy = make(chan *ClusterResp, 128)
185 t.masterNode = globals.cluster.ring.Get(t.name)
186 } else {
187 // It's a master topic. Make a channel for handling
188 // direct messages from the proxy.
189 t.master = make(chan *ClusterSessUpdate, 8)
190 }
191 }
192 // Topic is created in suspended state because it's not yet configured.
193 t.markPaused(true)
194 // Save topic now to prevent race condition.
195 h.topicPut(join.RcptTo, t)
196
197 // Configure the topic.
198 go topicInit(t, join, h)
199 } else {
200 // Topic found.
201 if t.isInactive() {
202 // Topic is either not ready or being deleted.
203 if join.sess.inflightReqs != nil {
204 join.sess.inflightReqs.Done()
205 }
206 join.sess.queueOut(ErrLockedReply(join, join.Timestamp))
207 continue
208 }

Callers 2

newHubFunction · 0.95
topicInitFunction · 0.45

Calls 15

topicGetMethod · 0.95
topicPutMethod · 0.95
topicsStateForUserMethod · 0.95
topicUnregMethod · 0.95
stopTopicsForUserMethod · 0.95
TimeNowFunction · 0.92
topicInitFunction · 0.85
ErrLockedReplyFunction · 0.85
NoErrAcceptedExplicitTsFunction · 0.85
replyOfflineTopicGetDescFunction · 0.85
replyOfflineTopicGetSubFunction · 0.85

Tested by

no test coverage detected