Written in TypeScript.
For Node.js 20.x and higher.
[MIT Licensed][].
Piscinanew Piscina([options])run(task[, options])destroy()close([options])'error''drain''needsDrain''message'completed (readonly)duration (readonly)options (readonly)runTime (readonly)threads (readonly)queueSize (readonly)needsDrain (readonly)utilization (readonly)waitTime (readonly)isWorkerThread (readonly)version (readonly)move(value)TransferableIn main.js:
const path = require("path");
const Piscina = require("piscina");
const piscina = new Piscina({
filename: path.resolve(__dirname, "worker.js"),
});
(async function () {
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Prints 10
})();
In worker.js:
module.exports = ({ a, b }) => {
return a + b;
};
The worker may also be an async function or may return a Promise:
const { setTimeout } = require("timers/promises");
module.exports = async ({ a, b }) => {
// Fake some async activity
await setTimeout(100);
return a + b;
};
ESM is also supported for both Piscina and workers:
import { Piscina } from "piscina";
const piscina = new Piscina({
// The URL must be a file:// URL
filename: new URL("./worker.mjs", import.meta.url).href,
});
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Prints 10
In worker.mjs:
export default ({ a, b }) => {
return a + b;
};
A single worker file may export multiple named handler functions.
"use strict";
function add({ a, b }) {
return a + b;
}
function multiply({ a, b }) {
return a * b;
}
add.add = add;
add.multiply = multiply;
module.exports = add;
The export to target can then be specified when the task is submitted:
"use strict";
const Piscina = require("piscina");
const { resolve } = require("path");
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
});
(async function () {
const res = await Promise.all([
piscina.run({ a: 4, b: 6 }, { name: "add" }),
piscina.run({ a: 4, b: 6 }, { name: "multiply" }),
]);
})();
Submitted tasks may be canceled using either an AbortController or
an EventEmitter:
"use strict";
const Piscina = require("piscina");
const { resolve } = require("path");
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
});
(async function () {
const abortController = new AbortController();
try {
const { signal } = abortController;
const task = piscina.run({ a: 4, b: 6 }, { signal });
abortController.abort();
await task;
} catch (err) {
console.log("The task was canceled");
}
})();
Alternatively, any EventEmitter that emits an 'abort' event
may be used as an abort controller:
"use strict";
const Piscina = require("piscina");
const EventEmitter = require("events");
const { resolve } = require("path");
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
});
(async function () {
const ee = new EventEmitter();
try {
const task = piscina.run({ a: 4, b: 6 }, { signal: ee });
ee.emit("abort");
await task;
} catch (err) {
console.log("The task was canceled");
}
})();
A worker thread will not be made available to process tasks until Piscina determines that it is "ready". By default, a worker is ready as soon as Piscina loads it and acquires a reference to the exported handler function.
There may be times when the availability of a worker may need to be delayed
longer while the worker initializes any resources it may need to operate.
To support this case, the worker module may export a Promise that resolves
the handler function as opposed to exporting the function directly:
async function initialize() {
await someAsyncInitializationActivity();
return ({ a, b }) => a + b;
}
module.exports = initialize();
Piscina will await the resolution of the exported Promise before marking the worker thread available.
When the maxQueue option is set, once the Piscina queue is full, no
additional tasks may be submitted until the queue size falls below the
limit. The 'drain' event may be used to receive notification when the
queue is empty and all tasks have been submitted to workers for processing.
Example: Using a Node.js stream to feed a Piscina worker pool:
"use strict";
const { resolve } = require("path");
const Pool = require("../..");
const pool = new Pool({
filename: resolve(__dirname, "worker.js"),
maxQueue: "auto",
});
const stream = getStreamSomehow();
stream.setEncoding("utf8");
pool.on("drain", () => {
if (stream.isPaused()) {
console.log("resuming...", counter, pool.queueSize);
stream.resume();
}
});
stream
.on("data", (data) => {
pool.run(data);
if (pool.queueSize === pool.options.maxQueue) {
console.log("pausing...", counter, pool.queueSize);
stream.pause();
}
})
.on("error", console.error)
.on("end", () => {
console.log("done");
});
A worker thread is only active until the moment it returns a result, it can be a result of a synchronous call or a Promise that will be fulfilled/rejected in the future. Once this is done, Piscina will wait for stdout and stderr to be flushed, and then pause the worker's event-loop until the next call. If async code is scheduled without being awaited before returning since Piscina has no way of detecting this, that code execution will be resumed on the next call. Thus, it is highly recommended to properly handle all async tasks before returning a result as it could make your code unpredictable.
For example:
const { setTimeout } = require("timers/promises");
module.exports = ({ a, b }) => {
// This promise should be awaited
setTimeout(1000).then(() => {
console.log("Working"); // This will **not** run during the same worker call
});
return a + b;
};
Piscina supports broadcast communication via BroadcastChannel(Node v18+). Here is an example, the main thread sends a message, and other threads the receive message.
In main.js
"use strict";
const { BroadcastChannel } = require("worker_threads");
const { resolve } = require("path");
const Piscina = require("piscina");
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
atomics: "disabled",
});
async function main() {
const bc = new BroadcastChannel("my_channel");
// start worker
Promise.all([piscina.run("thread 1"), piscina.run("thread 2")]);
// post message in one second
setTimeout(() => {
bc.postMessage("Main thread message");
}, 1000);
}
main();
In worker.js
"use strict";
const { BroadcastChannel } = require("worker_threads");
module.exports = async (thread) => {
const bc = new BroadcastChannel("my_channel");
bc.onmessage = (event) => {
console.log(thread + " Received from:" + event.data);
};
await new Promise((resolve) => {
setTimeout(resolve, 2000);
});
};
Additional examples can be found in the GitHub repo at https://github.com/piscinajs/piscina/tree/master/examples
PiscinaPiscina works by creating a pool of Node.js Worker Threads to which one or more tasks may be dispatched. Each worker thread executes a single exported function defined in a separate file. Whenever a task is dispatched to a worker, the worker invokes the exported function and reports the return value back to Piscina when the function completes.
This class extends [EventEmitter][] from Node.js.
new Piscina([options])The following optional configuration is supported:
filename: (string | null) Provides the default source for the code that
runs the tasks on Worker threads. This should be an absolute path or an
absolute file:// URL to a file that exports a JavaScript function or
async function as its default export or module.exports. [ES modules][]
are supported.
name: (string | null) Provides the name of the default exported worker
function. The default is 'default', indicating the default export of the
worker module.minThreads: (number) Sets the minimum number of threads that are always
running for this thread pool. The default is the number provided by os.availableParallelism.maxThreads: (number) Sets the maximum number of threads that are
running for this thread pool. The default is the number provided by os.availableParallelism * 1.5.idleTimeout: (number) A timeout in milliseconds that specifies how long
a Worker is allowed to be idle, i.e. not handling any tasks, before it is
shut down. By default, this is immediate. If Infinity is passed as the value,
the Worker never shuts down. Be careful when using Infinity,
as it can lead to resource overuse. Tip: The default idleTimeout
can lead to some performance loss in the application because of the overhead
involved with stopping and starting new worker threads. To improve performance,
try setting the idleTimeout explicitly.maxQueue: (number | string) The maximum number of tasks that may be
scheduled to run, but not yet running due to lack of available threads, at
a given time. By default, there is no limit. The special value 'auto'
may be used to have Piscina calculate the maximum as the square of maxThreads.
When 'auto' is used, the calculated maxQueue value may be found by checking
the options.maxQueue property.concurrentTasksPerWorker: (number) Specifies how many tasks can share
a single Worker thread simultaneously. The default is 1. This generally
only makes sense to specify if there is some kind of asynchronous component
to the task. Keep in mind that Worker threads are generally not built for
handling I/O in parallel.atomics: (sync | async | disabled) Use the Atomics API for faster communication
between threads. This is on by default. You can disable Atomics globally by
setting the environment variable PISCINA_DISABLE_ATOMICS to 1 .
If atomics is sync, it will cause to pause threads (stoping all execution)
between tasks. Ideally, threads should wait for all operations to finish before
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibilit$ claude mcp add piscina \
-- python -m otcore.mcp_server <graph>