SubscribeConfig is a special internal Subscribe variant that lets us inject an overridden size.
(config *SubscribeConfig)
| 221 | // SubscribeConfig is a special internal Subscribe variant that lets us inject |
| 222 | // an overridden size. |
| 223 | func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { |
| 224 | if config.ChanSize < 0 { |
| 225 | panic("SubscribeConfig.ChanSize must be greater or equal to 1") |
| 226 | } |
| 227 | if config.ChanSize == 0 { |
| 228 | config.ChanSize = subscribeChanSizeDefault |
| 229 | } |
| 230 | |
| 231 | for _, kind := range config.Kinds { |
| 232 | if _, ok := allKinds[kind]; !ok { |
| 233 | panic(fmt.Errorf("unknown event kind: %s", kind)) |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | subChan := make(chan *Event, config.ChanSize) |
| 238 | |
| 239 | sm.mu.Lock() |
| 240 | defer sm.mu.Unlock() |
| 241 | |
| 242 | // Just gives us an easy way of removing the subscription again later. |
| 243 | subID := sm.subscriptionsSeq |
| 244 | sm.subscriptionsSeq++ |
| 245 | |
| 246 | sm.subscriptions[subID] = &eventSubscription{ |
| 247 | Chan: subChan, |
| 248 | Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }), |
| 249 | } |
| 250 | |
| 251 | cancel := func() { |
| 252 | sm.mu.Lock() |
| 253 | defer sm.mu.Unlock() |
| 254 | |
| 255 | // May no longer be present in case this was called after a stop. |
| 256 | sub, ok := sm.subscriptions[subID] |
| 257 | if !ok { |
| 258 | return |
| 259 | } |
| 260 | |
| 261 | close(sub.Chan) |
| 262 | |
| 263 | delete(sm.subscriptions, subID) |
| 264 | } |
| 265 | |
| 266 | return subChan, cancel |
| 267 | } |