(ctx context.Context)
| 107 | } |
| 108 | |
| 109 | func (n *Notifier) Start(ctx context.Context) error { |
| 110 | ctx, shouldStart, started, stopped := n.StartInit(ctx) |
| 111 | if !shouldStart { |
| 112 | return nil |
| 113 | } |
| 114 | |
| 115 | // The loop below will connect/close on every iteration, but do one initial |
| 116 | // connect so the notifier fails fast in case of an obvious problem. |
| 117 | if err := n.listenerConnect(ctx, false); err != nil { |
| 118 | stopped() |
| 119 | if errors.Is(err, context.Canceled) { |
| 120 | return nil |
| 121 | } |
| 122 | return err |
| 123 | } |
| 124 | |
| 125 | go func() { |
| 126 | started() |
| 127 | defer stopped() |
| 128 | |
| 129 | n.Logger.DebugContext(ctx, n.Name+": Run loop started") |
| 130 | defer n.Logger.DebugContext(ctx, n.Name+": Run loop stopped") |
| 131 | |
| 132 | n.withLock(func() { n.isStarted = true }) |
| 133 | defer n.withLock(func() { n.isStarted = false }) |
| 134 | |
| 135 | defer n.listenerClose(ctx, false) |
| 136 | |
| 137 | var wg sync.WaitGroup |
| 138 | |
| 139 | wg.Go(func() { |
| 140 | n.deliverNotifications(ctx) |
| 141 | }) |
| 142 | |
| 143 | for attempt := 0; ; attempt++ { |
| 144 | if err := n.listenAndWait(ctx); err != nil { |
| 145 | if errors.Is(err, context.Canceled) { |
| 146 | break |
| 147 | } |
| 148 | |
| 149 | sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault) |
| 150 | n.Logger.ErrorContext(ctx, n.Name+": Error running listener (will attempt reconnect after backoff)", |
| 151 | slog.Int("attempt", attempt), |
| 152 | slog.String("err", err.Error()), |
| 153 | slog.String("sleep_duration", sleepDuration.String()), |
| 154 | ) |
| 155 | n.testSignals.BackoffError.Signal(err) |
| 156 | if !n.testDisableSleep { |
| 157 | serviceutil.CancellableSleep(ctx, sleepDuration) |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | wg.Wait() |
| 163 | }() |
| 164 | |
| 165 | return nil |
| 166 | } |
nothing calls this directly
no test coverage detected