Account users as members of an active topic. Used for cache management. In case of a cluster this method is called only when the topic is local: globals.cluster.isRemoteTopic(t.name) == false
(t *Topic, add bool)
| 869 | // In case of a cluster this method is called only when the topic is local: |
| 870 | // globals.cluster.isRemoteTopic(t.name) == false |
| 871 | func usersRegisterTopic(t *Topic, add bool) { |
| 872 | if globals.usersUpdate == nil { |
| 873 | return |
| 874 | } |
| 875 | |
| 876 | if t.cat == types.TopicCatFnd || t.cat == types.TopicCatMe { |
| 877 | // Ignoring me and fnd topics. |
| 878 | return |
| 879 | } |
| 880 | |
| 881 | local := &UserCacheReq{Inc: add} |
| 882 | |
| 883 | // In case of a cluster UIDs could be local and remote. Process local UIDs locally, |
| 884 | // send remote UIDs to other cluster nodes for processing. The UIDs may have to be |
| 885 | // sent to multiple nodes. |
| 886 | remote := &UserCacheReq{Inc: add} |
| 887 | for uid, pud := range t.perUser { |
| 888 | if pud.isChan { |
| 889 | // Skip channel subscribers. |
| 890 | continue |
| 891 | } |
| 892 | if globals.cluster.isRemoteTopic(uid.UserId()) { |
| 893 | remote.UserIdList = append(remote.UserIdList, uid) |
| 894 | } else { |
| 895 | local.UserIdList = append(local.UserIdList, uid) |
| 896 | } |
| 897 | } |
| 898 | |
| 899 | if len(remote.UserIdList) > 0 { |
| 900 | globals.cluster.routeUserReq(remote) |
| 901 | } |
| 902 | |
| 903 | if len(local.UserIdList) > 0 { |
| 904 | select { |
| 905 | case globals.usersUpdate <- local: |
| 906 | default: |
| 907 | logs.Err.Println("User cache: globals.usersUpdate queue full: ", len(globals.usersUpdate)) |
| 908 | } |
| 909 | } |
| 910 | } |
| 911 | |
| 912 | // usersRequestFromCluster handles requests which came from other cluser nodes. |
| 913 | func usersRequestFromCluster(req *UserCacheReq) { |
no test coverage detected
searching dependent graphs…