MCPcopy Index your code
hub / github.com/riverqueue/river / dispatchWork

Method dispatchWork

producer.go:790–819  ·  view source on GitHub ↗
(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult)

Source from the content-addressed store, hash-verified

788}
789
790func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
791 // This intentionally removes any deadlines or cancellation from the parent
792 // context because we don't want it to get cancelled if the producer is asked
793 // to shut down. In that situation, we want to finish fetching any jobs we are
794 // in the midst of fetching, work them, and then stop. Otherwise we'd have a
795 // risk of shutting down when we had already fetched jobs in the database,
796 // leaving those jobs stranded. We'd then potentially have to release them
797 // back to the queue.
798 ctx := context.WithoutCancel(workCtx)
799
800 // Maximum size of the `attempted_by` array on each job row. This maximum is
801 // rarely hit, but exists to protect against degenerate cases.
802 const maxAttemptedBy = 100
803
804 jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
805 ClientID: p.config.ClientID,
806 MaxAttemptedBy: maxAttemptedBy,
807 MaxToLock: count,
808 Now: p.Time.NowOrNil(),
809 Queue: p.config.Queue,
810 ProducerID: p.id.Load(),
811 Schema: p.config.Schema,
812 })
813 if err != nil {
814 fetchResultCh <- producerFetchResult{err: err}
815 return
816 }
817
818 fetchResultCh <- producerFetchResult{jobs: jobs}
819}
820
821// Periodically logs an informational log line giving some insight into the
822// current state of the producer.

Callers 1

innerFetchLoopMethod · 0.95

Calls 2

JobGetAvailableMethod · 0.65
NowOrNilMethod · 0.65

Tested by

no test coverage detected