MCPcopy Index your code
hub / github.com/riverqueue/river / waitOnce

Method waitOnce

internal/notifier/notifier.go:367–459  ·  view source on GitHub ↗

Enters a single blocking wait for notifications on the underlying listener. Waiting for a notification locks an underlying connection, so infrastructure elsewhere in the notifier must preempt it by sending to `n.waitInterruptChan` and invoking `n.waitCancel()`. Cancelling the input context (as occur

(ctx context.Context)

Source from the content-addressed store, hash-verified

365// and invoking `n.waitCancel()`. Cancelling the input context (as occurs during
366// shutdown) also unblocks the wait.
367func (n *Notifier) waitOnce(ctx context.Context) error {
368 n.withLock(func() {
369 n.isWaiting = true
370 ctx, n.waitCancel = context.WithCancel(ctx) //nolint:fatcontext
371 })
372 defer n.withLock(func() {
373 n.isWaiting = false
374 n.waitCancel()
375 })
376
377 // Save a reference to the parent context before creating the inner
378 // cancellable context. The inner context is cancelled by drainErrChan to
379 // interrupt WaitForNotification, but we still need a live context for the
380 // Ping health check afterward.
381 pingCtx := ctx
382
383 ctx, cancel := context.WithCancel(ctx)
384 defer cancel()
385
386 errChan := make(chan error)
387
388 go func() {
389 for {
390 notification, err := n.listener.WaitForNotification(ctx)
391 if err != nil {
392 errChan <- err
393 return
394 }
395
396 select {
397 case n.notificationBuf <- notification:
398 default:
399 n.Logger.WarnContext(ctx, n.Name+": Dropping notification due to full buffer", "payload", notification.Payload)
400 }
401 }
402 }()
403
404 drainErrChan := func() error {
405 cancel()
406
407 // There's a chance we encounter some other error before the context.Canceled comes in:
408 err := <-errChan
409 if err != nil && !errors.Is(err, context.Canceled) {
410 // A non-cancel error means something went wrong with the conn, so we should bail.
411 n.Logger.ErrorContext(ctx, n.Name+": Error on draining notification wait", "err", err)
412 return err
413 }
414 // If we got a context cancellation error, it means we successfully
415 // interrupted the WaitForNotification so that we could make the
416 // subscription change.
417 return nil
418 }
419
420 pingInterval := cmp.Or(n.testPingInterval, 5*time.Second)
421 needPingCtx, needPingCancel := context.WithTimeout(ctx, pingInterval)
422 defer needPingCancel()
423
424 // * Wait for notifications

Callers 1

listenAndWaitMethod · 0.95

Calls 5

withLockMethod · 0.95
DoneMethod · 0.80
WaitForNotificationMethod · 0.65
PingMethod · 0.65
IsMethod · 0.45

Tested by

no test coverage detected