MCPcopy Index your code
hub / github.com/bee-queue/bee-queue / process

Method process

lib/queue.js:662–754  ·  view source on GitHub ↗
(concurrency, handler)

Source from the content-addressed store, hash-verified

660 }
661
662 process(concurrency, handler) {
663 if (!this.settings.isWorker) {
664 throw new Error('Cannot call Queue#process on a non-worker');
665 }
666
667 if (this.handler) {
668 throw new Error('Cannot call Queue#process twice');
669 }
670
671 if (this.paused) {
672 throw new Error('closed');
673 }
674
675 if (typeof concurrency === 'function') {
676 handler = concurrency;
677 concurrency = defaults['#process'].concurrency;
678 }
679
680 // If the handler throws a synchronous exception (only applicable to
681 // non-`async` functions), catch it, and fail the job.
682 const catchExceptions = true;
683 this.handler = helpers.wrapAsync(handler, catchExceptions);
684 this.running = 0;
685 this.queued = 1;
686 this.concurrency = concurrency;
687
688 const jobTick = () => {
689 if (this.paused) {
690 this.queued -= 1;
691 return;
692 }
693
694 // invariant: in this code path, this.running < this.concurrency, always
695 // after spoolup, this.running + this.queued === this.concurrency
696 finally_(
697 this._getNextJob().then((job) => {
698 // We're shutting down.
699 if (this.paused) {
700 // This job will get picked up later as a stalled job if we happen
701 // to get here. We can't easily process this job because at this
702 // point Queue#close has already captured the activeJobs set in a
703 // Promise.all call, and we'd prefer to delay a job than
704 // half-process it.
705 this.queued -= 1;
706 return;
707 }
708
709 this.running += 1;
710 this.queued -= 1;
711 if (this.running + this.queued < this.concurrency) {
712 this.queued += 1;
713 setImmediate(jobTick);
714 }
715
716 if (!job) {
717 // Per comment in Queue#_waitForJob, this branch is possible when
718 // the job is removed before processing can take place, but after
719 // being initially acquired.

Callers 8

harness.jsFile · 0.80
harness.jsFile · 0.80
harness.jsFile · 0.80
queue-test.jsFile · 0.80
delay-test.jsFile · 0.80
ping.jsFile · 0.80
pong.jsFile · 0.80
worker.jsFile · 0.80

Calls 2

_doStalledJobCheckMethod · 0.95
_activateDelayedMethod · 0.95

Tested by

no test coverage detected