* Called after every completion or worker spawn — dispatches queued * executions to available workers.
()
| 1263 | * executions to available workers. |
| 1264 | */ |
| 1265 | function drainQueue() { |
| 1266 | while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) { |
| 1267 | const worker = selectWorker() |
| 1268 | if (!worker) { |
| 1269 | const activeWorkerCount = [...workers.values()].filter((w) => !w.retiring).length |
| 1270 | const currentPoolSize = activeWorkerCount + spawnInProgress |
| 1271 | if (currentPoolSize < POOL_SIZE) { |
| 1272 | spawnWorker() |
| 1273 | .then(() => drainQueue()) |
| 1274 | .catch((err) => { |
| 1275 | logger.error('Failed to spawn worker during drain', { err }) |
| 1276 | scheduleDrainRetry() |
| 1277 | }) |
| 1278 | } |
| 1279 | break |
| 1280 | } |
| 1281 | |
| 1282 | const owner = selectOwnerForDispatch() |
| 1283 | if (!owner) { |
| 1284 | scheduleDrainRetry() |
| 1285 | break |
| 1286 | } |
| 1287 | |
| 1288 | const queued = shiftQueuedExecutionForOwner(owner) |
| 1289 | if (!queued) { |
| 1290 | owner.burstRemaining = 0 |
| 1291 | maybeCleanupOwner(owner.ownerKey) |
| 1292 | continue |
| 1293 | } |
| 1294 | clearTimeout(queued.queueTimeout) |
| 1295 | // Clearing queueId: from here on, abort must reach the worker, not the queue. |
| 1296 | queued.state.queueId = undefined |
| 1297 | dispatchToWorker(worker, owner, queued.req, queued.resolve, queued.state, queued.brokers) |
| 1298 | } |
| 1299 | } |
| 1300 | |
| 1301 | /** |
| 1302 | * Execute JavaScript code in an isolated V8 isolate via Node.js subprocess. |
no test coverage detected