MCPcopy Index your code
hub / github.com/riverqueue/river / innerFetchLoop

Method innerFetchLoop

producer.go:637–676  ·  view source on GitHub ↗
(workCtx context.Context, fetchResultCh chan producerFetchResult)

Source from the content-addressed store, hash-verified

635}
636
637func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
638 var limit int
639 if p.paused {
640 limit = 0
641 } else {
642 limit = p.maxJobsToFetch()
643 if limit <= 0 {
644 // We have no slots for new jobs, so don't bother fetching. However, since
645 // we knew it was time to fetch, we keep track of what happened so we can
646 // trigger another fetch as soon as we have open slots.
647 p.fetchWhenSlotsAreAvailable = true
648 return
649 }
650 }
651
652 go p.dispatchWork(workCtx, limit, fetchResultCh)
653
654 for {
655 select {
656 case result := <-fetchResultCh:
657 if result.err != nil {
658 p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error()), slog.String("queue", p.config.Queue))
659 } else if len(result.jobs) > 0 {
660 p.startNewExecutors(workCtx, result.jobs)
661
662 if len(result.jobs) == limit {
663 // Fetch returned the maximum number of jobs that were requested,
664 // implying there may be more in the queue. Trigger another fetch when
665 // slots are available.
666 p.fetchWhenSlotsAreAvailable = true
667 }
668 }
669 return
670 case result := <-p.jobResultCh:
671 p.removeActiveJob(result)
672 case jobID := <-p.cancelCh:
673 p.maybeCancelJob(workCtx, jobID)
674 }
675 }
676}
677
678func (p *producer) executorShutdownLoop() {
679 // No more jobs will be fetched or executed. However, we must wait for all

Callers 1

fetchAndRunLoopMethod · 0.95

Calls 6

maxJobsToFetchMethod · 0.95
dispatchWorkMethod · 0.95
startNewExecutorsMethod · 0.95
removeActiveJobMethod · 0.95
maybeCancelJobMethod · 0.95
ErrorMethod · 0.45

Tested by

no test coverage detected