Forward server response message to the node that owns topic.
(topic string, msg *ServerComMessage, sess *Session)
| 899 | |
| 900 | // Forward server response message to the node that owns topic. |
| 901 | func (c *Cluster) routeToTopicIntraCluster(topic string, msg *ServerComMessage, sess *Session) error { |
| 902 | if c == nil { |
| 903 | // Cluster may be nil due to shutdown. |
| 904 | return nil |
| 905 | } |
| 906 | |
| 907 | n := c.nodeForTopic(topic) |
| 908 | if n == nil { |
| 909 | return errors.New("node for topic not found (intra)") |
| 910 | } |
| 911 | |
| 912 | route := &ClusterRoute{ |
| 913 | Node: c.thisNodeName, |
| 914 | Signature: c.ring.Signature(), |
| 915 | Fingerprint: c.fingerprint, |
| 916 | SrvMsg: msg, |
| 917 | } |
| 918 | |
| 919 | if sess != nil { |
| 920 | route.Sess = &ClusterSess{Sid: sess.sid} |
| 921 | } |
| 922 | return n.route(route) |
| 923 | } |
| 924 | |
| 925 | // Topic proxy terminated. Inform remote Master node that the proxy is gone. |
| 926 | func (c *Cluster) topicProxyGone(topicName string) error { |
no test coverage detected