Takes a session leave request, forwards it to the topic master and modifies the local state accordingly. Returns whether the operation was successful.
(msg *ClientComMessage, killTimer *time.Timer)
| 120 | // modifies the local state accordingly. |
| 121 | // Returns whether the operation was successful. |
| 122 | func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.Timer) bool { |
| 123 | // Detach session from topic; session may continue to function. |
| 124 | var asUid types.Uid |
| 125 | if msg.init { |
| 126 | asUid = types.ParseUserId(msg.AsUser) |
| 127 | } |
| 128 | |
| 129 | if asUid.IsZero() { |
| 130 | if pssd, ok := t.sessions[msg.sess]; ok { |
| 131 | asUid = pssd.uid |
| 132 | } else { |
| 133 | logs.Warn.Printf("proxy topic[%s]: leave request sent for unknown session", t.name) |
| 134 | return false |
| 135 | } |
| 136 | } |
| 137 | // Remove the session from the topic without waiting for a response from the master node |
| 138 | // because by the time the response arrives this session may be already gone from the session store |
| 139 | // and we won't be able to find and remove it by its sid. |
| 140 | pssd, result := t.remSession(msg.sess, asUid) |
| 141 | if result { |
| 142 | msg.sess.delSub(t.name) |
| 143 | } |
| 144 | if !msg.init { |
| 145 | // Explicitly specify the uid because the master multiplex session needs to know which |
| 146 | // of its multiple hosted sessions to delete. |
| 147 | msg.AsUser = asUid.UserId() |
| 148 | msg.Leave = &MsgClientLeave{} |
| 149 | msg.init = true |
| 150 | } |
| 151 | // Make sure we set the Original field if it's empty (e.g. when session is terminating altogether). |
| 152 | if msg.Original == "" { |
| 153 | if t.cat == types.TopicCatGrp && t.isChan { |
| 154 | // It's a channel topic. Original topic name depends the subscription type. |
| 155 | if result && pssd.isChanSub { |
| 156 | msg.Original = types.GrpToChn(t.xoriginal) |
| 157 | } else { |
| 158 | msg.Original = t.xoriginal |
| 159 | } |
| 160 | } else { |
| 161 | msg.Original = t.original(asUid) |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, msg, t.name, msg.sess); err != nil { |
| 166 | logs.Warn.Printf("proxy topic[%s]: route leave request from proxy to master failed - %s", t.name, err) |
| 167 | } |
| 168 | if len(t.sessions) == 0 { |
| 169 | // No more sessions attached. Start the countdown. |
| 170 | killTimer.Reset(idleProxyTopicTimeout) |
| 171 | } |
| 172 | return result |
| 173 | } |
| 174 | |
| 175 | // proxyMasterResponse at proxy topic processes a master topic response to an earlier request. |
| 176 | func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) { |
no test coverage detected