Forward client request message from the Topic Proxy to the Topic Master (cluster node which owns the topic).
(reqType ProxyReqType, msg *ClientComMessage, topic string, sess *Session)
| 874 | |
| 875 | // Forward client request message from the Topic Proxy to the Topic Master (cluster node which owns the topic). |
| 876 | func (c *Cluster) routeToTopicMaster(reqType ProxyReqType, msg *ClientComMessage, topic string, sess *Session) error { |
| 877 | if c == nil { |
| 878 | // Cluster may be nil due to shutdown. |
| 879 | return nil |
| 880 | } |
| 881 | |
| 882 | if sess != nil && reqType != ProxyReqLeave { |
| 883 | if atomic.LoadInt32(&sess.terminating) > 0 { |
| 884 | // The session is terminating. |
| 885 | // Do not forward any requests except "leave" to the topic master. |
| 886 | return nil |
| 887 | } |
| 888 | } |
| 889 | |
| 890 | req := c.makeClusterReq(reqType, msg, topic, sess) |
| 891 | |
| 892 | // Find the cluster node which owns the topic, then forward to it. |
| 893 | n := c.nodeForTopic(topic) |
| 894 | if n == nil { |
| 895 | return errors.New("node for topic not found") |
| 896 | } |
| 897 | return n.proxyToMasterAsync(req) |
| 898 | } |
| 899 | |
| 900 | // Forward server response message to the node that owns topic. |
| 901 | func (c *Cluster) routeToTopicIntraCluster(topic string, msg *ServerComMessage, sess *Session) error { |
no test coverage detected