MCPcopy Index your code
hub / github.com/riverqueue/river / SubscribeConfig

Method SubscribeConfig

subscription_manager.go:223–267  ·  view source on GitHub ↗

SubscribeConfig is a special internal Subscribe variant that lets us inject an overridden size.

(config *SubscribeConfig)

Source from the content-addressed store, hash-verified

221// SubscribeConfig is a special internal Subscribe variant that lets us inject
222// an overridden size.
223func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
224 if config.ChanSize < 0 {
225 panic("SubscribeConfig.ChanSize must be greater or equal to 1")
226 }
227 if config.ChanSize == 0 {
228 config.ChanSize = subscribeChanSizeDefault
229 }
230
231 for _, kind := range config.Kinds {
232 if _, ok := allKinds[kind]; !ok {
233 panic(fmt.Errorf("unknown event kind: %s", kind))
234 }
235 }
236
237 subChan := make(chan *Event, config.ChanSize)
238
239 sm.mu.Lock()
240 defer sm.mu.Unlock()
241
242 // Just gives us an easy way of removing the subscription again later.
243 subID := sm.subscriptionsSeq
244 sm.subscriptionsSeq++
245
246 sm.subscriptions[subID] = &eventSubscription{
247 Chan: subChan,
248 Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
249 }
250
251 cancel := func() {
252 sm.mu.Lock()
253 defer sm.mu.Unlock()
254
255 // May no longer be present in case this was called after a stop.
256 sub, ok := sm.subscriptions[subID]
257 if !ok {
258 return
259 }
260
261 close(sub.Chan)
262
263 delete(sm.subscriptions, subID)
264 }
265
266 return subChan, cancel
267}

Callers

nothing calls this directly

Calls 2

KeyByFunction · 0.92
ErrorfMethod · 0.65

Tested by

no test coverage detected