topicInit reads an existing topic from database or creates a new topic
(t *Topic, join *ClientComMessage, h *Hub)
| 19 | |
| 20 | // topicInit reads an existing topic from database or creates a new topic |
| 21 | func topicInit(t *Topic, join *ClientComMessage, h *Hub) { |
| 22 | var subscribeReqIssued bool |
| 23 | defer func() { |
| 24 | if !subscribeReqIssued && join.Sub != nil && join.sess.inflightReqs != nil { |
| 25 | // If it was a client initiated subscribe request and we failed it. |
| 26 | join.sess.inflightReqs.Done() |
| 27 | } |
| 28 | }() |
| 29 | |
| 30 | timestamp := types.TimeNow() |
| 31 | |
| 32 | var err error |
| 33 | switch { |
| 34 | case t.xoriginal == "me": |
| 35 | // Request to load a 'me' topic. The topic always exists, the subscription is never new. |
| 36 | err = initTopicMe(t, join) |
| 37 | case t.xoriginal == "fnd": |
| 38 | // Request to load a 'find' topic. The topic always exists, the subscription is never new. |
| 39 | err = initTopicFnd(t, join) |
| 40 | case strings.HasPrefix(t.xoriginal, "usr") || strings.HasPrefix(t.xoriginal, "p2p"): |
| 41 | // Request to load an existing or create a new p2p topic, then attach to it. |
| 42 | err = initTopicP2P(t, join) |
| 43 | case strings.HasPrefix(t.xoriginal, "new"): |
| 44 | // Processing request to create a new group topic. |
| 45 | err = initTopicNewGrp(t, join, false) |
| 46 | case strings.HasPrefix(t.xoriginal, "nch"): |
| 47 | // Processing request to create a new channel. |
| 48 | err = initTopicNewGrp(t, join, true) |
| 49 | case strings.HasPrefix(t.xoriginal, "grp") || strings.HasPrefix(t.xoriginal, "chn"): |
| 50 | // Load existing group topic (or channel). |
| 51 | err = initTopicGrp(t) |
| 52 | case t.xoriginal == "sys": |
| 53 | // Initialize system topic. |
| 54 | err = initTopicSys(t) |
| 55 | case t.xoriginal == "slf": |
| 56 | // Initialize self (notes and saved messages) topic. |
| 57 | err = initTopicSlf(t, join) |
| 58 | default: |
| 59 | // Unrecognized topic name |
| 60 | err = types.ErrTopicNotFound |
| 61 | } |
| 62 | |
| 63 | // Failed to create or load the topic. |
| 64 | if err != nil { |
| 65 | // Remove topic from cache to prevent hub from forwarding more messages to it. |
| 66 | h.topicDel(join.RcptTo) |
| 67 | |
| 68 | logs.Err.Println("init_topic: failed to load or create topic:", join.RcptTo, err) |
| 69 | join.sess.queueOut(decodeStoreErrorExplicitTs(err, join.Id, t.xoriginal, timestamp, join.Timestamp, nil)) |
| 70 | |
| 71 | // Re-queue pending requests to join the topic. |
| 72 | for len(t.reg) > 0 { |
| 73 | h.join <- (<-t.reg) |
| 74 | } |
| 75 | |
| 76 | // Reject all other pending requests |
| 77 | for len(t.clientMsg) > 0 { |
| 78 | msg := <-t.clientMsg |
no test coverage detected
searching dependent graphs…