TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic.
(msg *ClusterReq, rejected *bool)
| 467 | |
| 468 | // TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic. |
| 469 | func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error { |
| 470 | *rejected = false |
| 471 | |
| 472 | node := c.nodes[msg.Node] |
| 473 | if node == nil { |
| 474 | logs.Warn.Println("cluster TopicMaster: request from an unknown node", msg.Node) |
| 475 | return nil |
| 476 | } |
| 477 | |
| 478 | // Master maintains one multiplexing session per proxy topic per node. |
| 479 | // Except channel topics: |
| 480 | // * one multiplexing session for channel subscriptions. |
| 481 | // * one multiplexing session for group subscriptions. |
| 482 | var msid string |
| 483 | if msg.CliMsg != nil && types.IsChannel(msg.CliMsg.Original) { |
| 484 | // If it's a channel request, use channel name. |
| 485 | msid = msg.CliMsg.Original |
| 486 | } else { |
| 487 | msid = msg.RcptTo |
| 488 | } |
| 489 | // Append node name. |
| 490 | msid += "-" + msg.Node |
| 491 | msess := globals.sessionStore.Get(msid) |
| 492 | |
| 493 | if msg.Gone { |
| 494 | // Proxy topic is gone. Tear down the local auxiliary session. |
| 495 | // If it was the last session, master topic will shut down as well. |
| 496 | node.stopMultiplexingSession(msess) |
| 497 | |
| 498 | if t := globals.hub.topicGet(msg.RcptTo); t != nil && t.isChan { |
| 499 | // If it's a channel topic, also stop the "chnX-" local auxiliary session. |
| 500 | msidChn := types.GrpToChn(t.name) + "-" + msg.Node |
| 501 | node.stopMultiplexingSession(globals.sessionStore.Get(msidChn)) |
| 502 | } |
| 503 | |
| 504 | return nil |
| 505 | } |
| 506 | |
| 507 | if msg.Signature != c.ring.Signature() { |
| 508 | logs.Warn.Println("cluster TopicMaster: session signature mismatch", msg.RcptTo) |
| 509 | *rejected = true |
| 510 | return nil |
| 511 | } |
| 512 | |
| 513 | // Create a new multiplexing session if needed. |
| 514 | if msess == nil { |
| 515 | // If the session is not found, create it. |
| 516 | var count int |
| 517 | msess, count = globals.sessionStore.NewSession(node, msid) |
| 518 | node.lock.Lock() |
| 519 | node.msess[msid] = struct{}{} |
| 520 | node.lock.Unlock() |
| 521 | |
| 522 | logs.Info.Println("cluster: multiplexing session started", msid, count) |
| 523 | msess.proxiedTopic = msg.RcptTo |
| 524 | } |
| 525 | |
| 526 | // This is a local copy of a remote session. |
nothing calls this directly
no test coverage detected