Request to subscribe to a topic.
(msg *ClientComMessage)
| 615 | |
| 616 | // Request to subscribe to a topic. |
| 617 | func (s *Session) subscribe(msg *ClientComMessage) { |
| 618 | if strings.HasPrefix(msg.Original, "new") || strings.HasPrefix(msg.Original, "nch") { |
| 619 | // Request to create a new group/channel topic. |
| 620 | // If we are in a cluster, make sure the new topic belongs to the current node. |
| 621 | msg.RcptTo = globals.cluster.genLocalTopicName() |
| 622 | } else { |
| 623 | var resp *ServerComMessage |
| 624 | msg.RcptTo, resp = s.expandTopicName(msg) |
| 625 | if resp != nil { |
| 626 | s.queueOut(resp) |
| 627 | return |
| 628 | } |
| 629 | } |
| 630 | |
| 631 | s.inflightReqs.Add(1) |
| 632 | // Session can subscribe to topic on behalf of a single user at a time. |
| 633 | if sub := s.getSub(msg.RcptTo); sub != nil { |
| 634 | s.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp)) |
| 635 | s.inflightReqs.Done() |
| 636 | } else { |
| 637 | select { |
| 638 | case globals.hub.join <- msg: |
| 639 | default: |
| 640 | // Reply with a 503 to the user. |
| 641 | s.queueOut(ErrServiceUnavailableReply(msg, msg.Timestamp)) |
| 642 | s.inflightReqs.Done() |
| 643 | logs.Err.Println("s.subscribe: hub.join queue full, topic ", msg.RcptTo, s.sid) |
| 644 | } |
| 645 | // Hub will send Ctrl success/failure packets back to session |
| 646 | } |
| 647 | } |
| 648 | |
| 649 | // Leave/Unsubscribe a topic |
| 650 | func (s *Session) leave(msg *ClientComMessage) { |
nothing calls this directly
no test coverage detected