* @internal
(
commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number
)
| 1507 | * @internal |
| 1508 | */ |
| 1509 | async _executePipeline( |
| 1510 | commands: Array<RedisMultiQueuedCommand>, |
| 1511 | selectedDB?: number |
| 1512 | ) { |
| 1513 | if (!this._self.#socket.isOpen) { |
| 1514 | return Promise.reject(new ClientClosedError()); |
| 1515 | } |
| 1516 | |
| 1517 | const batchSize = commands.length; |
| 1518 | |
| 1519 | return trace(CHANNELS.TRACE_BATCH, |
| 1520 | async () => { |
| 1521 | const chainId = Symbol('Pipeline Chain'); |
| 1522 | const promise = Promise.all( |
| 1523 | commands.map(({ args }) => { |
| 1524 | const traced = trace(CHANNELS.TRACE_COMMAND, |
| 1525 | () => this._self.#queue.addCommand(args, { |
| 1526 | chainId, |
| 1527 | typeMapping: this._commandOptions?.typeMapping |
| 1528 | }), |
| 1529 | () => ({ |
| 1530 | ...this._self.#commandTraceContext(args), |
| 1531 | batchMode: 'PIPELINE' as const, |
| 1532 | batchSize |
| 1533 | }) |
| 1534 | ); |
| 1535 | // Prevent unhandled rejection from tracePromise wrapper; individual |
| 1536 | // rejections are collected by Promise.all, but the tracePromise wrapper |
| 1537 | // is a separate branch that nobody awaits. |
| 1538 | traced.catch(noop); |
| 1539 | return traced; |
| 1540 | }) |
| 1541 | ); |
| 1542 | this._self.#scheduleWrite(); |
| 1543 | |
| 1544 | const result = await promise; |
| 1545 | |
| 1546 | if (selectedDB !== undefined) { |
| 1547 | this._self.#selectedDB = selectedDB; |
| 1548 | } |
| 1549 | |
| 1550 | return result; |
| 1551 | }, |
| 1552 | () => ({ |
| 1553 | batchMode: 'PIPELINE' as const, |
| 1554 | batchSize, |
| 1555 | database: this._self.#selectedDB, |
| 1556 | clientId: this._self._clientId, |
| 1557 | ...this._self.#socketTraceContext() |
| 1558 | }) |
| 1559 | ); |
| 1560 | } |
| 1561 | |
| 1562 | /** |
| 1563 | * @internal |
no test coverage detected