Loops every FetchPollInterval to check for jobs. This is meant as a back up in case something with listen/notify didn't work, or the fetch limiter was limited so there's still jobs to pick up, and it's also important in poll-only mode.
(ctx context.Context, wg *sync.WaitGroup)
| 607 | // limited so there's still jobs to pick up, and it's also important in |
| 608 | // poll-only mode. |
| 609 | func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { |
| 610 | defer wg.Done() |
| 611 | |
| 612 | fetchPollTimer := time.NewTimer(p.jitteredFetchPollInterval()) |
| 613 | for { |
| 614 | select { |
| 615 | case <-ctx.Done(): |
| 616 | // Stop fetch timer so no more fetches are triggered. |
| 617 | if !fetchPollTimer.Stop() { |
| 618 | <-fetchPollTimer.C |
| 619 | } |
| 620 | return |
| 621 | case <-fetchPollTimer.C: |
| 622 | p.fetchLimiter.Call() |
| 623 | fetchPollTimer.Reset(p.jitteredFetchPollInterval()) |
| 624 | } |
| 625 | } |
| 626 | } |
| 627 | |
| 628 | // jitteredFetchPollInterval returns FetchPollInterval with random jitter in |
| 629 | // [0, 10% of FetchPollInterval) added (minimum 10ms). This prevents multiple |
no test coverage detected