(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult)
| 788 | } |
| 789 | |
| 790 | func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { |
| 791 | // This intentionally removes any deadlines or cancellation from the parent |
| 792 | // context because we don't want it to get cancelled if the producer is asked |
| 793 | // to shut down. In that situation, we want to finish fetching any jobs we are |
| 794 | // in the midst of fetching, work them, and then stop. Otherwise we'd have a |
| 795 | // risk of shutting down when we had already fetched jobs in the database, |
| 796 | // leaving those jobs stranded. We'd then potentially have to release them |
| 797 | // back to the queue. |
| 798 | ctx := context.WithoutCancel(workCtx) |
| 799 | |
| 800 | // Maximum size of the `attempted_by` array on each job row. This maximum is |
| 801 | // rarely hit, but exists to protect against degenerate cases. |
| 802 | const maxAttemptedBy = 100 |
| 803 | |
| 804 | jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ |
| 805 | ClientID: p.config.ClientID, |
| 806 | MaxAttemptedBy: maxAttemptedBy, |
| 807 | MaxToLock: count, |
| 808 | Now: p.Time.NowOrNil(), |
| 809 | Queue: p.config.Queue, |
| 810 | ProducerID: p.id.Load(), |
| 811 | Schema: p.config.Schema, |
| 812 | }) |
| 813 | if err != nil { |
| 814 | fetchResultCh <- producerFetchResult{err: err} |
| 815 | return |
| 816 | } |
| 817 | |
| 818 | fetchResultCh <- producerFetchResult{jobs: jobs} |
| 819 | } |
| 820 | |
| 821 | // Periodically logs an informational log line giving some insight into the |
| 822 | // current state of the producer. |
no test coverage detected