MCPcopy
hub / github.com/tinode/chat / topicInit

Function topicInit

server/init_topic.go:21–135  ·  view source on GitHub ↗

topicInit reads an existing topic from database or creates a new topic

(t *Topic, join *ClientComMessage, h *Hub)

Source from the content-addressed store, hash-verified

19
20// topicInit reads an existing topic from database or creates a new topic
21func topicInit(t *Topic, join *ClientComMessage, h *Hub) {
22 var subscribeReqIssued bool
23 defer func() {
24 if !subscribeReqIssued && join.Sub != nil && join.sess.inflightReqs != nil {
25 // If it was a client initiated subscribe request and we failed it.
26 join.sess.inflightReqs.Done()
27 }
28 }()
29
30 timestamp := types.TimeNow()
31
32 var err error
33 switch {
34 case t.xoriginal == "me":
35 // Request to load a 'me' topic. The topic always exists, the subscription is never new.
36 err = initTopicMe(t, join)
37 case t.xoriginal == "fnd":
38 // Request to load a 'find' topic. The topic always exists, the subscription is never new.
39 err = initTopicFnd(t, join)
40 case strings.HasPrefix(t.xoriginal, "usr") || strings.HasPrefix(t.xoriginal, "p2p"):
41 // Request to load an existing or create a new p2p topic, then attach to it.
42 err = initTopicP2P(t, join)
43 case strings.HasPrefix(t.xoriginal, "new"):
44 // Processing request to create a new group topic.
45 err = initTopicNewGrp(t, join, false)
46 case strings.HasPrefix(t.xoriginal, "nch"):
47 // Processing request to create a new channel.
48 err = initTopicNewGrp(t, join, true)
49 case strings.HasPrefix(t.xoriginal, "grp") || strings.HasPrefix(t.xoriginal, "chn"):
50 // Load existing group topic (or channel).
51 err = initTopicGrp(t)
52 case t.xoriginal == "sys":
53 // Initialize system topic.
54 err = initTopicSys(t)
55 case t.xoriginal == "slf":
56 // Initialize self (notes and saved messages) topic.
57 err = initTopicSlf(t, join)
58 default:
59 // Unrecognized topic name
60 err = types.ErrTopicNotFound
61 }
62
63 // Failed to create or load the topic.
64 if err != nil {
65 // Remove topic from cache to prevent hub from forwarding more messages to it.
66 h.topicDel(join.RcptTo)
67
68 logs.Err.Println("init_topic: failed to load or create topic:", join.RcptTo, err)
69 join.sess.queueOut(decodeStoreErrorExplicitTs(err, join.Id, t.xoriginal, timestamp, join.Timestamp, nil))
70
71 // Re-queue pending requests to join the topic.
72 for len(t.reg) > 0 {
73 h.join <- (<-t.reg)
74 }
75
76 // Reject all other pending requests
77 for len(t.clientMsg) > 0 {
78 msg := <-t.clientMsg

Callers 1

runMethod · 0.85

Calls 15

TimeNowFunction · 0.92
initTopicMeFunction · 0.85
initTopicFndFunction · 0.85
initTopicP2PFunction · 0.85
initTopicNewGrpFunction · 0.85
initTopicGrpFunction · 0.85
initTopicSysFunction · 0.85
initTopicSlfFunction · 0.85
ErrLockedExplicitTsFunction · 0.85
ErrLockedReplyFunction · 0.85
statsIncFunction · 0.85

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…