(concurrency, handler)
| 660 | } |
| 661 | |
| 662 | process(concurrency, handler) { |
| 663 | if (!this.settings.isWorker) { |
| 664 | throw new Error('Cannot call Queue#process on a non-worker'); |
| 665 | } |
| 666 | |
| 667 | if (this.handler) { |
| 668 | throw new Error('Cannot call Queue#process twice'); |
| 669 | } |
| 670 | |
| 671 | if (this.paused) { |
| 672 | throw new Error('closed'); |
| 673 | } |
| 674 | |
| 675 | if (typeof concurrency === 'function') { |
| 676 | handler = concurrency; |
| 677 | concurrency = defaults['#process'].concurrency; |
| 678 | } |
| 679 | |
| 680 | // If the handler throws a synchronous exception (only applicable to |
| 681 | // non-`async` functions), catch it, and fail the job. |
| 682 | const catchExceptions = true; |
| 683 | this.handler = helpers.wrapAsync(handler, catchExceptions); |
| 684 | this.running = 0; |
| 685 | this.queued = 1; |
| 686 | this.concurrency = concurrency; |
| 687 | |
| 688 | const jobTick = () => { |
| 689 | if (this.paused) { |
| 690 | this.queued -= 1; |
| 691 | return; |
| 692 | } |
| 693 | |
| 694 | // invariant: in this code path, this.running < this.concurrency, always |
| 695 | // after spoolup, this.running + this.queued === this.concurrency |
| 696 | finally_( |
| 697 | this._getNextJob().then((job) => { |
| 698 | // We're shutting down. |
| 699 | if (this.paused) { |
| 700 | // This job will get picked up later as a stalled job if we happen |
| 701 | // to get here. We can't easily process this job because at this |
| 702 | // point Queue#close has already captured the activeJobs set in a |
| 703 | // Promise.all call, and we'd prefer to delay a job than |
| 704 | // half-process it. |
| 705 | this.queued -= 1; |
| 706 | return; |
| 707 | } |
| 708 | |
| 709 | this.running += 1; |
| 710 | this.queued -= 1; |
| 711 | if (this.running + this.queued < this.concurrency) { |
| 712 | this.queued += 1; |
| 713 | setImmediate(jobTick); |
| 714 | } |
| 715 | |
| 716 | if (!job) { |
| 717 | // Per comment in Queue#_waitForJob, this branch is possible when |
| 718 | // the job is removed before processing can take place, but after |
| 719 | // being initially acquired. |
no test coverage detected