registerSession handles a session join (registration) request received via the Topic.reg channel.
(msg *ClientComMessage)
| 339 | // registerSession handles a session join (registration) request |
| 340 | // received via the Topic.reg channel. |
| 341 | func (t *Topic) registerSession(msg *ClientComMessage) { |
| 342 | // Request to add a connection to this topic |
| 343 | if t.isInactive() { |
| 344 | msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow())) |
| 345 | } else if msg.sess.getSub(t.name) != nil { |
| 346 | // Session is already subscribed to topic. Subscription is checked in session.go, |
| 347 | // but there is a gap between topic creation/un-pausing and processing the |
| 348 | // first subscription request, before the topic is linked to session: a client |
| 349 | // may send several subscription requests in that gap. |
| 350 | msg.sess.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp)) |
| 351 | } else { |
| 352 | // The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die |
| 353 | // while processing the call. |
| 354 | t.killTimer.Stop() |
| 355 | if err := t.handleSubscription(msg); err == nil { |
| 356 | if msg.Sub.Created { |
| 357 | // Call plugins with the new topic |
| 358 | pluginTopic(t, plgActCreate) |
| 359 | } |
| 360 | } else { |
| 361 | if len(t.sessions) == 0 && t.cat != types.TopicCatSys { |
| 362 | // Failed to subscribe, the topic is still inactive |
| 363 | t.killTimer.Reset(idleMasterTopicTimeout) |
| 364 | } |
| 365 | logs.Warn.Printf("topic[%s] subscription failed %v, sid=%s", t.name, err, msg.sess.sid) |
| 366 | } |
| 367 | } |
| 368 | if msg.sess.inflightReqs != nil { |
| 369 | msg.sess.inflightReqs.Done() |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | func (t *Topic) handleMetaGet(msg *ClientComMessage, asUid types.Uid, asChan bool, authLevel auth.Level) { |
| 374 | if msg.MetaWhat&constMsgMetaDesc != 0 { |