Subscribe creates a subscription on a topic
(ctx context.Context, topic string)
| 69 | |
| 70 | // Subscribe creates a subscription on a topic |
| 71 | func (m *Module) Subscribe(ctx context.Context, topic string) (<-chan *redis.Message, error) { |
| 72 | m.lock.Lock() |
| 73 | defer m.lock.Unlock() |
| 74 | |
| 75 | // Check if subscription already exists. Return corresponding channel if it does. |
| 76 | if sub, p := m.mapping[topic]; p { |
| 77 | return sub.ch, nil |
| 78 | } |
| 79 | |
| 80 | // Make a redis subscription |
| 81 | pubsub := m.client.Subscribe(context.TODO(), m.getTopicName(topic)) |
| 82 | if _, err := pubsub.Receive(ctx); err != nil { |
| 83 | return nil, err |
| 84 | } |
| 85 | |
| 86 | // Make a channel to listen for subscriptions |
| 87 | ch := pubsub.Channel() |
| 88 | m.mapping[topic] = &subscription{ch, pubsub} |
| 89 | return ch, nil |
| 90 | } |
| 91 | |
| 92 | // CancelSubscription cancels an active subscription |
| 93 | func (m *Module) CancelSubscription(topic string) { |
nothing calls this directly
no test coverage detected