MCPcopy Index your code
hub / github.com/tinode/chat / TopicMaster

Method TopicMaster

server/cluster.go:469–613  ·  view source on GitHub ↗

TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic.

(msg *ClusterReq, rejected *bool)

Source from the content-addressed store, hash-verified

467
468// TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic.
469func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error {
470 *rejected = false
471
472 node := c.nodes[msg.Node]
473 if node == nil {
474 logs.Warn.Println("cluster TopicMaster: request from an unknown node", msg.Node)
475 return nil
476 }
477
478 // Master maintains one multiplexing session per proxy topic per node.
479 // Except channel topics:
480 // * one multiplexing session for channel subscriptions.
481 // * one multiplexing session for group subscriptions.
482 var msid string
483 if msg.CliMsg != nil && types.IsChannel(msg.CliMsg.Original) {
484 // If it's a channel request, use channel name.
485 msid = msg.CliMsg.Original
486 } else {
487 msid = msg.RcptTo
488 }
489 // Append node name.
490 msid += "-" + msg.Node
491 msess := globals.sessionStore.Get(msid)
492
493 if msg.Gone {
494 // Proxy topic is gone. Tear down the local auxiliary session.
495 // If it was the last session, master topic will shut down as well.
496 node.stopMultiplexingSession(msess)
497
498 if t := globals.hub.topicGet(msg.RcptTo); t != nil && t.isChan {
499 // If it's a channel topic, also stop the "chnX-" local auxiliary session.
500 msidChn := types.GrpToChn(t.name) + "-" + msg.Node
501 node.stopMultiplexingSession(globals.sessionStore.Get(msidChn))
502 }
503
504 return nil
505 }
506
507 if msg.Signature != c.ring.Signature() {
508 logs.Warn.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo)
509 *rejected = true
510 return nil
511 }
512
513 // Create a new multiplexing session if needed.
514 if msess == nil {
515 // If the session is not found, create it.
516 var count int
517 msess, count = globals.sessionStore.NewSession(node, msid)
518 node.lock.Lock()
519 node.msess[msid] = struct{}{}
520 node.lock.Unlock()
521
522 logs.Info.Println("cluster: multiplexing session started", msid, count)
523 msess.proxiedTopic = msg.RcptTo
524 }
525
526 // This is a local copy of a remote session.

Callers

nothing calls this directly

Calls 12

queueOutMethod · 0.95
IsChannelFunction · 0.92
GrpToChnFunction · 0.92
ErrUnknownReplyFunction · 0.85
PrintlnMethod · 0.80
topicGetMethod · 0.80
SignatureMethod · 0.80
NewSessionMethod · 0.80
LockMethod · 0.80
UnlockMethod · 0.80
GetMethod · 0.65

Tested by

no test coverage detected