(ctx context.Context, wg *sync.WaitGroup, lastPaused bool, lastMetadata []byte)
| 910 | } |
| 911 | |
| 912 | func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup, lastPaused bool, lastMetadata []byte) { |
| 913 | defer wg.Done() |
| 914 | |
| 915 | ticker := time.NewTicker(p.config.QueuePollInterval) |
| 916 | defer ticker.Stop() |
| 917 | for { |
| 918 | select { |
| 919 | case <-ctx.Done(): |
| 920 | return |
| 921 | case <-ticker.C: |
| 922 | updatedQueue, err := func() (*rivertype.Queue, error) { |
| 923 | ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 924 | defer cancel() |
| 925 | |
| 926 | return p.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ |
| 927 | Name: p.config.Queue, |
| 928 | Schema: p.config.Schema, |
| 929 | }) |
| 930 | }() |
| 931 | if err != nil { |
| 932 | // Don't log if this is part of a standard shutdown. |
| 933 | if !errors.Is(context.Cause(ctx), startstop.ErrStop) { |
| 934 | p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error())) |
| 935 | } |
| 936 | continue |
| 937 | } |
| 938 | |
| 939 | if updatedQueue == nil { |
| 940 | p.Logger.ErrorContext(ctx, p.Name+": Queue row not found when polling for setting changes", slog.String("queue", p.config.Queue)) |
| 941 | continue |
| 942 | } |
| 943 | |
| 944 | // Look for a change in the paused state: |
| 945 | shouldBePaused := (updatedQueue.PausedAt != nil) |
| 946 | if lastPaused != shouldBePaused { |
| 947 | action := controlActionPause |
| 948 | if !shouldBePaused { |
| 949 | action = controlActionResume |
| 950 | } |
| 951 | payload := &controlEventPayload{ |
| 952 | Action: action, |
| 953 | Queue: p.config.Queue, |
| 954 | } |
| 955 | p.Logger.DebugContext(ctx, p.Name+": Queue control state changed from polling", |
| 956 | slog.String("queue", p.config.Queue), |
| 957 | slog.String("action", string(action)), |
| 958 | slog.Bool("paused", shouldBePaused), |
| 959 | ) |
| 960 | |
| 961 | select { |
| 962 | case p.queueControlCh <- payload: |
| 963 | lastPaused = shouldBePaused |
| 964 | default: |
| 965 | p.Logger.WarnContext(ctx, p.Name+": Queue control notification dropped due to full buffer", slog.String("action", string(action))) |
| 966 | } |
| 967 | } |
| 968 | |
| 969 | // Look for a change in the queue's metadata: |
no test coverage detected