MCPcopy
hub / github.com/redis/node-redis / _executePipeline

Method _executePipeline

packages/client/lib/client/index.ts:1509–1560  ·  view source on GitHub ↗

* @internal

(
    commands: Array<RedisMultiQueuedCommand>,
    selectedDB?: number
  )

Source from the content-addressed store, hash-verified

1507 * @internal
1508 */
1509 async _executePipeline(
1510 commands: Array<RedisMultiQueuedCommand>,
1511 selectedDB?: number
1512 ) {
1513 if (!this._self.#socket.isOpen) {
1514 return Promise.reject(new ClientClosedError());
1515 }
1516
1517 const batchSize = commands.length;
1518
1519 return trace(CHANNELS.TRACE_BATCH,
1520 async () => {
1521 const chainId = Symbol('Pipeline Chain');
1522 const promise = Promise.all(
1523 commands.map(({ args }) => {
1524 const traced = trace(CHANNELS.TRACE_COMMAND,
1525 () => this._self.#queue.addCommand(args, {
1526 chainId,
1527 typeMapping: this._commandOptions?.typeMapping
1528 }),
1529 () => ({
1530 ...this._self.#commandTraceContext(args),
1531 batchMode: 'PIPELINE' as const,
1532 batchSize
1533 })
1534 );
1535 // Prevent unhandled rejection from tracePromise wrapper; individual
1536 // rejections are collected by Promise.all, but the tracePromise wrapper
1537 // is a separate branch that nobody awaits.
1538 traced.catch(noop);
1539 return traced;
1540 })
1541 );
1542 this._self.#scheduleWrite();
1543
1544 const result = await promise;
1545
1546 if (selectedDB !== undefined) {
1547 this._self.#selectedDB = selectedDB;
1548 }
1549
1550 return result;
1551 },
1552 () => ({
1553 batchMode: 'PIPELINE' as const,
1554 batchSize,
1555 database: this._self.#selectedDB,
1556 clientId: this._self._clientId,
1557 ...this._self.#socketTraceContext()
1558 })
1559 );
1560 }
1561
1562 /**
1563 * @internal

Callers 2

MULTIMethod · 0.45
MULTIMethod · 0.45

Calls 6

traceFunction · 0.90
#commandTraceContextMethod · 0.80
#scheduleWriteMethod · 0.80
#socketTraceContextMethod · 0.80
rejectMethod · 0.65
addCommandMethod · 0.45

Tested by

no test coverage detected