MCPcopy Index your code
hub / github.com/riverqueue/river / pollForSettingChanges

Method pollForSettingChanges

producer.go:912–991  ·  view source on GitHub ↗
(ctx context.Context, wg *sync.WaitGroup, lastPaused bool, lastMetadata []byte)

Source from the content-addressed store, hash-verified

910}
911
912func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup, lastPaused bool, lastMetadata []byte) {
913 defer wg.Done()
914
915 ticker := time.NewTicker(p.config.QueuePollInterval)
916 defer ticker.Stop()
917 for {
918 select {
919 case <-ctx.Done():
920 return
921 case <-ticker.C:
922 updatedQueue, err := func() (*rivertype.Queue, error) {
923 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
924 defer cancel()
925
926 return p.exec.QueueGet(ctx, &riverdriver.QueueGetParams{
927 Name: p.config.Queue,
928 Schema: p.config.Schema,
929 })
930 }()
931 if err != nil {
932 // Don't log if this is part of a standard shutdown.
933 if !errors.Is(context.Cause(ctx), startstop.ErrStop) {
934 p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error()))
935 }
936 continue
937 }
938
939 if updatedQueue == nil {
940 p.Logger.ErrorContext(ctx, p.Name+": Queue row not found when polling for setting changes", slog.String("queue", p.config.Queue))
941 continue
942 }
943
944 // Look for a change in the paused state:
945 shouldBePaused := (updatedQueue.PausedAt != nil)
946 if lastPaused != shouldBePaused {
947 action := controlActionPause
948 if !shouldBePaused {
949 action = controlActionResume
950 }
951 payload := &controlEventPayload{
952 Action: action,
953 Queue: p.config.Queue,
954 }
955 p.Logger.DebugContext(ctx, p.Name+": Queue control state changed from polling",
956 slog.String("queue", p.config.Queue),
957 slog.String("action", string(action)),
958 slog.Bool("paused", shouldBePaused),
959 )
960
961 select {
962 case p.queueControlCh <- payload:
963 lastPaused = shouldBePaused
964 default:
965 p.Logger.WarnContext(ctx, p.Name+": Queue control notification dropped due to full buffer", slog.String("action", string(action)))
966 }
967 }
968
969 // Look for a change in the queue's metadata:

Callers 1

StartWorkContextMethod · 0.95

Calls 7

metadataEqualFunction · 0.85
DoneMethod · 0.80
SignalMethod · 0.80
StopMethod · 0.65
QueueGetMethod · 0.65
IsMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected