Enters a single blocking wait for notifications on the underlying listener. Waiting for a notification locks an underlying connection, so infrastructure elsewhere in the notifier must preempt it by sending to `n.waitInterruptChan` and invoking `n.waitCancel()`. Cancelling the input context (as occur
(ctx context.Context)
| 365 | // and invoking `n.waitCancel()`. Cancelling the input context (as occurs during |
| 366 | // shutdown) also unblocks the wait. |
| 367 | func (n *Notifier) waitOnce(ctx context.Context) error { |
| 368 | n.withLock(func() { |
| 369 | n.isWaiting = true |
| 370 | ctx, n.waitCancel = context.WithCancel(ctx) //nolint:fatcontext |
| 371 | }) |
| 372 | defer n.withLock(func() { |
| 373 | n.isWaiting = false |
| 374 | n.waitCancel() |
| 375 | }) |
| 376 | |
| 377 | // Save a reference to the parent context before creating the inner |
| 378 | // cancellable context. The inner context is cancelled by drainErrChan to |
| 379 | // interrupt WaitForNotification, but we still need a live context for the |
| 380 | // Ping health check afterward. |
| 381 | pingCtx := ctx |
| 382 | |
| 383 | ctx, cancel := context.WithCancel(ctx) |
| 384 | defer cancel() |
| 385 | |
| 386 | errChan := make(chan error) |
| 387 | |
| 388 | go func() { |
| 389 | for { |
| 390 | notification, err := n.listener.WaitForNotification(ctx) |
| 391 | if err != nil { |
| 392 | errChan <- err |
| 393 | return |
| 394 | } |
| 395 | |
| 396 | select { |
| 397 | case n.notificationBuf <- notification: |
| 398 | default: |
| 399 | n.Logger.WarnContext(ctx, n.Name+": Dropping notification due to full buffer", "payload", notification.Payload) |
| 400 | } |
| 401 | } |
| 402 | }() |
| 403 | |
| 404 | drainErrChan := func() error { |
| 405 | cancel() |
| 406 | |
| 407 | // There's a chance we encounter some other error before the context.Canceled comes in: |
| 408 | err := <-errChan |
| 409 | if err != nil && !errors.Is(err, context.Canceled) { |
| 410 | // A non-cancel error means something went wrong with the conn, so we should bail. |
| 411 | n.Logger.ErrorContext(ctx, n.Name+": Error on draining notification wait", "err", err) |
| 412 | return err |
| 413 | } |
| 414 | // If we got a context cancellation error, it means we successfully |
| 415 | // interrupted the WaitForNotification so that we could make the |
| 416 | // subscription change. |
| 417 | return nil |
| 418 | } |
| 419 | |
| 420 | pingInterval := cmp.Or(n.testPingInterval, 5*time.Second) |
| 421 | needPingCtx, needPingCancel := context.WithTimeout(ctx, pingInterval) |
| 422 | defer needPingCancel() |
| 423 | |
| 424 | // * Wait for notifications |
no test coverage detected