MCPcopy Index your code
hub / github.com/riverqueue/river / Start

Method Start

subscription_manager.go:47–95  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

45}
46
47func (sm *subscriptionManager) Start(ctx context.Context) error {
48 ctx, shouldStart, started, stopped := sm.StartInit(ctx)
49 if !shouldStart {
50 return nil
51 }
52
53 go func() {
54 started()
55 defer stopped() // this defer should come first so it's last out
56
57 sm.Logger.DebugContext(ctx, sm.Name+": Run loop started")
58 defer sm.Logger.DebugContext(ctx, sm.Name+": Run loop stopped")
59
60 // On shutdown, close and remove all active subscriptions.
61 defer func() {
62 sm.mu.Lock()
63 defer sm.mu.Unlock()
64
65 for subID, sub := range sm.subscriptions {
66 close(sub.Chan)
67 delete(sm.subscriptions, subID)
68 }
69 }()
70
71 for {
72 select {
73 case <-ctx.Done():
74 // Distribute remaining subscriptions until the channel is
75 // closed. This does make the subscription manager a little
76 // problematic in that it requires the subscription channel to
77 // be closed before it will fully stop. This always happens in
78 // the case of a real client by virtue of the completer always
79 // stopping at the same time as the subscription manager, but
80 // one has to be careful in tests.
81 sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed")
82 for updates := range sm.subscribeCh {
83 sm.distributeJobUpdates(ctx, updates)
84 }
85
86 return
87
88 case updates := <-sm.subscribeCh:
89 sm.distributeJobUpdates(ctx, updates)
90 }
91 }
92 }()
93
94 return nil
95}
96
97func (sm *subscriptionManager) logStats(ctx context.Context, svcName string) {
98 sm.statsMu.Lock()

Callers

nothing calls this directly

Calls 3

distributeJobUpdatesMethod · 0.95
StartInitMethod · 0.80
DoneMethod · 0.80

Tested by

no test coverage detected