MCPcopy
hub / github.com/sindresorhus/p-queue

github.com/sindresorhus/p-queue @v9.3.0 sqlite

repository ↗ · DeepWiki ↗ · release v9.3.0 ↗
77 symbols 235 edges 14 files 18 documented · 23%
README

p-queue

Promise queue with concurrency control

Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

For servers, you probably want a Redis-backed job queue instead.

Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.


        <sup>
            <a href="https://github.com/sponsors/sindresorhus">Sindre's open source work is supported by the community</a>

Special thanks to:

    <a href="https://fetchfox.ai?ref=sindre">



            <img src="https://sindresorhus.com/assets/thanks/fetchfox-logo.svg" height="200"/>



        <b>Scrape anything with FetchFox</b>



            <sup>FetchFox is an AI powered scraping tool that lets you scrape data from any website</sup>



    </a>

Install

npm install p-queue

Warning: This package is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to convert to ESM. Please don't open issues for questions regarding CommonJS / ESM.

Usage

Here we run only one promise at a time. For example, set concurrency to 4 to run four promises at the same time.

import PQueue from 'p-queue';
import got from 'got';

const queue = new PQueue({concurrency: 1});

(async () => {
    await queue.add(() => got('https://sindresorhus.com'));
    console.log('Done: sindresorhus.com');
})();

(async () => {
    await queue.add(() => got('https://avajs.dev'));
    console.log('Done: avajs.dev');
})();

API

PQueue(options?)

Returns a new queue instance, which is an EventEmitter3 subclass.

options

Type: object

concurrency

Type: number\ Default: Infinity\ Minimum: 1

Concurrency limit.

timeout

Type: number\ Default: undefined

Per-operation timeout in milliseconds. Operations will throw a TimeoutError if they don't complete within the specified time.

The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue.

Can be overridden per task using the timeout option in .add():

const queue = new PQueue({timeout: 5000});

// This task uses the global 5s timeout
await queue.add(() => fetchData());

// This task has a 10s timeout
await queue.add(() => slowTask(), {timeout: 10000});
autoStart

Type: boolean\ Default: true

Whether queue tasks within the concurrency limit are auto-executed as soon as they're added.

queueClass

Type: Function

Class with a enqueue and dequeue method, and a size getter. See the Custom QueueClass section.

intervalCap

Type: number\ Default: Infinity\ Minimum: 1

The max number of runs in the given interval of time.

interval

Type: number\ Default: 0\ Minimum: 0

The length of time in milliseconds before the interval count resets. Must be finite.

carryoverIntervalCount

Type: boolean\ Default: false

If true, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap. If false, any of those pending Promises will not count towards the next intervalCap.

strict

Type: boolean\ Default: false

Whether to use strict mode for rate limiting (sliding window algorithm).

When enabled, ensures that no more than intervalCap tasks execute in any rolling interval window, rather than resetting the count at fixed intervals. This provides more predictable and evenly distributed execution.

For example, with intervalCap: 2 and interval: 1000: - Default mode (fixed window): Tasks can burst at window boundaries. You could execute 2 tasks at 999ms and 2 more at 1000ms, resulting in 4 tasks within 1ms. - Strict mode (sliding window): Enforces that no more than 2 tasks execute in any 1000ms rolling window, preventing bursts.

[!NOTE] Strict mode is more resource-intensive as it tracks individual execution timestamps. Use it when you need guaranteed rate-limit compliance, such as when interacting with APIs that enforce strict rate limits.

[!NOTE] The carryoverIntervalCount option has no effect when strict mode is enabled, as strict mode tracks actual execution timestamps rather than counting pending tasks.

queue

PQueue instance.

.add(fn, options?)

Adds a sync or async task to the queue.

Returns a promise that settles when the task completes, not when it's added to the queue. The promise resolves with the return value of fn.

[!IMPORTANT] If you await this promise, you will wait for the task to finish running, which may defeat the purpose of using a queue for concurrency. See the Usage section for examples.

[!NOTE] If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.

fn

Type: Function

Promise-returning/async function. When executed, it will receive {signal} as the first argument.

options

Type: object

priority

Type: number\ Default: 0

Priority of operation. Operations with greater priority will be scheduled first.

id

Type string

Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from 1n.

signal

AbortSignal for cancellation of the operation. When aborted, it will be removed from the queue and the queue.add() call will reject with an error. If the operation is already running, the signal will need to be handled by the operation itself.

import PQueue from 'p-queue';
import got, {CancelError} from 'got';

const queue = new PQueue();

const controller = new AbortController();

try {
    await queue.add(({signal}) => {
        const request = got('https://sindresorhus.com');

        signal.addEventListener('abort', () => {
            request.cancel();
        });

        try {
            return await request;
        } catch (error) {
            if (!(error instanceof CancelError)) {
                throw error;
            }
        }
    }, {signal: controller.signal});
} catch (error) {
    if (!(error instanceof DOMException)) {
        throw error;
    }
}

.addAll(fns, options?)

Same as .add(), but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.

.pause()

Put queue execution on hold.

.start()

Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

Returns this (the instance).

.onEmpty()

Returns a promise that settles when the queue becomes empty.

Can be called multiple times. Useful if you for example add additional items at a later time.

[!NOTE] The promise returned by .onEmpty() resolves once when the queue becomes empty. If you want to be notified every time the queue becomes empty, use the empty event instead: queue.on('empty', () => {}).

.onIdle()

Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished. .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

[!NOTE] The promise returned by .onIdle() resolves once when the queue becomes idle. If you want to be notified every time the queue becomes idle, use the idle event instead: queue.on('idle', () => {}).

.onPendingZero()

Returns a promise that settles when all currently running tasks have completed; queue.pending === 0.

The difference with .onIdle is that .onPendingZero only waits for currently running tasks to finish, ignoring queued tasks. This is useful when you want to drain in-flight tasks before mutating shared state.

queue.pause();
await queue.onPendingZero();
// All running tasks have finished, though the queue may still have items

.onRateLimit()

Returns a promise that settles when the queue becomes rate-limited due to intervalCap. If the queue is already rate-limited, the promise resolves immediately.

Useful for implementing backpressure to prevent memory issues when producers are faster than consumers.

const queue = new PQueue({intervalCap: 5, interval: 1000});

// Add many tasks
for (let index = 0; index < 10; index++) {
    queue.add(() => someTask());
}

await queue.onRateLimit();
console.log('Queue is now rate-limited - time for maintenance tasks');

.onRateLimitCleared()

Returns a promise that settles when the queue is no longer rate-limited. If the queue is not currently rate-limited, the promise resolves immediately.

const queue = new PQueue({intervalCap: 5, interval: 1000});

// Wait for rate limiting to be cleared
await queue.onRateLimitCleared();
console.log('Rate limit cleared - can add more tasks');

.onError()

Returns a promise that rejects when any task in the queue errors.

Use with Promise.race([queue.onError(), queue.onIdle()]) to fail fast on the first error while still resolving normally when the queue goes idle.

[!IMPORTANT] The promise returned by add() still rejects. You must handle each add() promise (for example, .catch(() => {})) to avoid unhandled rejections.

import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.add(() => fetchData(1)).catch(() => {});
queue.add(() => fetchData(2)).catch(() => {});
queue.add(() => fetchData(3)).catch(() => {});

// Stop processing on first error
try {
    await Promise.race([
        queue.onError(),
        queue.onIdle()
    ]);
} catch (error) {
    queue.pause(); // Stop processing remaining tasks
    console.error('Queue failed:', error);
}

.onSizeLessThan(limit)

Returns a promise that settles when the queue size is less than the given limit: queue.size < limit.

If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.

Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.

.clear()

Clear the queue.

[!WARNING] Any promises returned by .add() for tasks that were waiting in the queue (not yet running) will never settle after calling .clear(). This can cause "unsettled top-level await" warnings or hang your process. If you need the promises to settle, use AbortSignal for cancellation instead — aborting rejects the .add() promise cleanly.

.size

Size of the queue, the number of queued items waiting to run.

.sizeBy(options)

Size of the queue, filtered by the given options.

For example, this can be used to find the number of items remaining in the queue with a specific priority level.

import PQueue from 'p-queue';

const queue = new PQueue();

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 0});
queue.add(async () => '🦄', {priority: 1});

console.log(queue.sizeBy({priority: 1}));
//=> 2

console.log(queue.sizeBy({priority: 0}));
//=> 1

.setPriority(id, priority)

Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

For example, this can be used to prioritize a promise function to run earlier.

import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 1});

queue.setPriority('🦀', 2);

In this case, the promise function with id: '🦀' runs second.

You can also deprioritize a promise function to delay its execution:

import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
queue.add(async () => '🦄');
queue.add(async () => '🦄', {priority: 0});

queue.setPriority('🦀', -1);

Here, the promise function with id: '🦀' executes last.

.pending

Number of running items (no longer in the queue).

.timeout

Type: number | undefined

Get or set the default timeout for all tasks. Can be changed at runtime.

Operations will throw a TimeoutError if they don't complete within the specified time.

The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue.

const queue = new PQueue({timeout: 5000});

// Change timeout for all future tasks
queue.timeout = 10000;

.concurrency

.isPaused

Whether the queue is currently paused.

.isRateLimited

Whether the queue is currently rate-limited due to intervalCap. Returns true when the number of tasks executed in the current interval has reached the intervalCap and there are still tasks waiting to be processed.

.isSaturated

Whether the queue is saturated. Returns true when: - All concurrency slots are occupied and tasks are waiting, OR - The queue is rate-limited

Core symbols most depended-on inside this repo

add
called by 434
source/index.ts
onIdle
called by 77
source/index.ts
dequeue
called by 31
source/priority-queue.ts
enqueue
called by 26
source/priority-queue.ts
start
called by 25
source/index.ts
createRun
called by 23
test/priority-queue.ts
clear
called by 19
source/index.ts
onPendingZero
called by 16
source/index.ts

Shape

Method 56
Function 17
Class 4

Languages

TypeScript100%

Modules by API surface

source/index.ts51 symbols
source/priority-queue.ts9 symbols
test/basic.ts5 symbols
bench.ts4 symbols
test/strict.ts3 symbols
test/rate-limit.ts2 symbols
test/priority-queue.ts1 symbols
test/advanced.ts1 symbols
source/lower-bound.ts1 symbols

Dependencies from manifests, versioned

@sindresorhus/tsconfig8.0.1 · 1×
@types/benchmark2.1.5 · 1×
@types/node25.6.0 · 1×
benchmark2.1.4 · 1×
del-cli6.0.0 · 1×
delay6.0.0 · 1×
eventemitter35.0.4 · 1×
in-range3.0.0 · 1×
p-defer4.0.1 · 1×
p-timeout7.0.0 · 1×
random-int3.1.0 · 1×
time-span5.1.0 · 1×

For agents

$ claude mcp add p-queue \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact