MCPcopy Index your code
hub / github.com/riverqueue/river / Start

Method Start

internal/notifier/notifier.go:109–166  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

107}
108
109func (n *Notifier) Start(ctx context.Context) error {
110 ctx, shouldStart, started, stopped := n.StartInit(ctx)
111 if !shouldStart {
112 return nil
113 }
114
115 // The loop below will connect/close on every iteration, but do one initial
116 // connect so the notifier fails fast in case of an obvious problem.
117 if err := n.listenerConnect(ctx, false); err != nil {
118 stopped()
119 if errors.Is(err, context.Canceled) {
120 return nil
121 }
122 return err
123 }
124
125 go func() {
126 started()
127 defer stopped()
128
129 n.Logger.DebugContext(ctx, n.Name+": Run loop started")
130 defer n.Logger.DebugContext(ctx, n.Name+": Run loop stopped")
131
132 n.withLock(func() { n.isStarted = true })
133 defer n.withLock(func() { n.isStarted = false })
134
135 defer n.listenerClose(ctx, false)
136
137 var wg sync.WaitGroup
138
139 wg.Go(func() {
140 n.deliverNotifications(ctx)
141 })
142
143 for attempt := 0; ; attempt++ {
144 if err := n.listenAndWait(ctx); err != nil {
145 if errors.Is(err, context.Canceled) {
146 break
147 }
148
149 sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault)
150 n.Logger.ErrorContext(ctx, n.Name+": Error running listener (will attempt reconnect after backoff)",
151 slog.Int("attempt", attempt),
152 slog.String("err", err.Error()),
153 slog.String("sleep_duration", sleepDuration.String()),
154 )
155 n.testSignals.BackoffError.Signal(err)
156 if !n.testDisableSleep {
157 serviceutil.CancellableSleep(ctx, sleepDuration)
158 }
159 }
160 }
161
162 wg.Wait()
163 }()
164
165 return nil
166}

Callers

nothing calls this directly

Calls 11

listenerConnectMethod · 0.95
withLockMethod · 0.95
listenerCloseMethod · 0.95
deliverNotificationsMethod · 0.95
listenAndWaitMethod · 0.95
ExponentialBackoffFunction · 0.92
CancellableSleepFunction · 0.92
StartInitMethod · 0.80
SignalMethod · 0.80
IsMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected