(workCtx context.Context, fetchResultCh chan producerFetchResult)
| 635 | } |
| 636 | |
| 637 | func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { |
| 638 | var limit int |
| 639 | if p.paused { |
| 640 | limit = 0 |
| 641 | } else { |
| 642 | limit = p.maxJobsToFetch() |
| 643 | if limit <= 0 { |
| 644 | // We have no slots for new jobs, so don't bother fetching. However, since |
| 645 | // we knew it was time to fetch, we keep track of what happened so we can |
| 646 | // trigger another fetch as soon as we have open slots. |
| 647 | p.fetchWhenSlotsAreAvailable = true |
| 648 | return |
| 649 | } |
| 650 | } |
| 651 | |
| 652 | go p.dispatchWork(workCtx, limit, fetchResultCh) |
| 653 | |
| 654 | for { |
| 655 | select { |
| 656 | case result := <-fetchResultCh: |
| 657 | if result.err != nil { |
| 658 | p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error()), slog.String("queue", p.config.Queue)) |
| 659 | } else if len(result.jobs) > 0 { |
| 660 | p.startNewExecutors(workCtx, result.jobs) |
| 661 | |
| 662 | if len(result.jobs) == limit { |
| 663 | // Fetch returned the maximum number of jobs that were requested, |
| 664 | // implying there may be more in the queue. Trigger another fetch when |
| 665 | // slots are available. |
| 666 | p.fetchWhenSlotsAreAvailable = true |
| 667 | } |
| 668 | } |
| 669 | return |
| 670 | case result := <-p.jobResultCh: |
| 671 | p.removeActiveJob(result) |
| 672 | case jobID := <-p.cancelCh: |
| 673 | p.maybeCancelJob(workCtx, jobID) |
| 674 | } |
| 675 | } |
| 676 | } |
| 677 | |
| 678 | func (p *producer) executorShutdownLoop() { |
| 679 | // No more jobs will be fetched or executed. However, we must wait for all |
no test coverage detected