MCPcopy
hub / github.com/tinode/chat / proxyMasterResponse

Method proxyMasterResponse

server/topic_proxy.go:176–252  ·  view source on GitHub ↗

proxyMasterResponse at proxy topic processes a master topic response to an earlier request.

(msg *ClusterResp, killTimer *time.Timer)

Source from the content-addressed store, hash-verified

174
175// proxyMasterResponse at proxy topic processes a master topic response to an earlier request.
176func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
177 // Kills topic after a period of inactivity.
178 keepAlive := idleProxyTopicTimeout
179
180 if msg.SrvMsg.Pres != nil && msg.SrvMsg.Pres.What == "acs" && msg.SrvMsg.Pres.Acs != nil {
181 // If the server changed acs on this topic, update the internal state.
182 t.updateAcsFromPresMsg(msg.SrvMsg.Pres)
183 }
184
185 if msg.OrigSid == "*" {
186 // It is a broadcast.
187 switch {
188 case msg.SrvMsg.Pres != nil || msg.SrvMsg.Data != nil || msg.SrvMsg.Info != nil:
189 // Regular broadcast.
190 t.handleProxyBroadcast(msg.SrvMsg)
191 case msg.SrvMsg.Ctrl != nil:
192 // Ctrl broadcast. E.g. for user eviction.
193 t.proxyCtrlBroadcast(msg.SrvMsg)
194 default:
195 }
196 } else {
197 sess := globals.sessionStore.Get(msg.OrigSid)
198 if sess == nil {
199 logs.Warn.Printf("proxy topic[%s]: session %s not found; already terminated?", t.name, msg.OrigSid)
200 }
201 switch msg.OrigReqType {
202 case ProxyReqJoin:
203 if sess != nil && msg.SrvMsg.Ctrl != nil {
204 // TODO: do we need to let the master topic know that the subscription is not longer valid
205 // or is it already informed by the session when it terminated?
206
207 // Subscription result.
208 if msg.SrvMsg.Ctrl.Code < 300 {
209 sess.sessionStoreLock.Lock()
210 // Make sure the session isn't gone yet.
211 if session := globals.sessionStore.Get(msg.OrigSid); session != nil {
212 // Successful subscriptions.
213 t.addSession(session, msg.SrvMsg.uid, types.IsChannel(msg.SrvMsg.Ctrl.Topic))
214 session.addSub(t.name, &Subscription{
215 broadcast: t.clientMsg,
216 done: t.unreg,
217 meta: t.meta,
218 supd: t.supd,
219 })
220 }
221 sess.sessionStoreLock.Unlock()
222
223 killTimer.Stop()
224 } else if len(t.sessions) == 0 {
225 killTimer.Reset(keepAlive)
226 }
227 }
228 case ProxyReqBroadcast, ProxyReqMeta, ProxyReqCall:
229 // no processing
230 case ProxyReqLeave:
231 if msg.SrvMsg != nil && msg.SrvMsg.Ctrl != nil {
232 if msg.SrvMsg.Ctrl.Code < 300 {
233 if sess != nil {

Callers 1

runProxyMethod · 0.95

Calls 13

updateAcsFromPresMsgMethod · 0.95
handleProxyBroadcastMethod · 0.95
proxyCtrlBroadcastMethod · 0.95
addSessionMethod · 0.95
remSessionMethod · 0.95
IsChannelFunction · 0.92
LockMethod · 0.80
addSubMethod · 0.80
UnlockMethod · 0.80
queueOutMethod · 0.80
GetMethod · 0.65
StopMethod · 0.65

Tested by

no test coverage detected