MCPcopy Index your code
hub / github.com/riverqueue/river / fetchPollLoop

Method fetchPollLoop

producer.go:609–626  ·  view source on GitHub ↗

Loops every FetchPollInterval to check for jobs. This is meant as a back up in case something with listen/notify didn't work, or the fetch limiter was limited so there's still jobs to pick up, and it's also important in poll-only mode.

(ctx context.Context, wg *sync.WaitGroup)

Source from the content-addressed store, hash-verified

607// limited so there's still jobs to pick up, and it's also important in
608// poll-only mode.
609func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) {
610 defer wg.Done()
611
612 fetchPollTimer := time.NewTimer(p.jitteredFetchPollInterval())
613 for {
614 select {
615 case <-ctx.Done():
616 // Stop fetch timer so no more fetches are triggered.
617 if !fetchPollTimer.Stop() {
618 <-fetchPollTimer.C
619 }
620 return
621 case <-fetchPollTimer.C:
622 p.fetchLimiter.Call()
623 fetchPollTimer.Reset(p.jitteredFetchPollInterval())
624 }
625 }
626}
627
628// jitteredFetchPollInterval returns FetchPollInterval with random jitter in
629// [0, 10% of FetchPollInterval) added (minimum 10ms). This prevents multiple

Callers 1

StartWorkContextMethod · 0.95

Calls 4

DoneMethod · 0.80
CallMethod · 0.80
StopMethod · 0.65

Tested by

no test coverage detected