(ctx context.Context)
| 45 | } |
| 46 | |
| 47 | func (sm *subscriptionManager) Start(ctx context.Context) error { |
| 48 | ctx, shouldStart, started, stopped := sm.StartInit(ctx) |
| 49 | if !shouldStart { |
| 50 | return nil |
| 51 | } |
| 52 | |
| 53 | go func() { |
| 54 | started() |
| 55 | defer stopped() // this defer should come first so it's last out |
| 56 | |
| 57 | sm.Logger.DebugContext(ctx, sm.Name+": Run loop started") |
| 58 | defer sm.Logger.DebugContext(ctx, sm.Name+": Run loop stopped") |
| 59 | |
| 60 | // On shutdown, close and remove all active subscriptions. |
| 61 | defer func() { |
| 62 | sm.mu.Lock() |
| 63 | defer sm.mu.Unlock() |
| 64 | |
| 65 | for subID, sub := range sm.subscriptions { |
| 66 | close(sub.Chan) |
| 67 | delete(sm.subscriptions, subID) |
| 68 | } |
| 69 | }() |
| 70 | |
| 71 | for { |
| 72 | select { |
| 73 | case <-ctx.Done(): |
| 74 | // Distribute remaining subscriptions until the channel is |
| 75 | // closed. This does make the subscription manager a little |
| 76 | // problematic in that it requires the subscription channel to |
| 77 | // be closed before it will fully stop. This always happens in |
| 78 | // the case of a real client by virtue of the completer always |
| 79 | // stopping at the same time as the subscription manager, but |
| 80 | // one has to be careful in tests. |
| 81 | sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed") |
| 82 | for updates := range sm.subscribeCh { |
| 83 | sm.distributeJobUpdates(ctx, updates) |
| 84 | } |
| 85 | |
| 86 | return |
| 87 | |
| 88 | case updates := <-sm.subscribeCh: |
| 89 | sm.distributeJobUpdates(ctx, updates) |
| 90 | } |
| 91 | } |
| 92 | }() |
| 93 | |
| 94 | return nil |
| 95 | } |
| 96 | |
| 97 | func (sm *subscriptionManager) logStats(ctx context.Context, svcName string) { |
| 98 | sm.statsMu.Lock() |
nothing calls this directly
no test coverage detected