| 110 | } |
| 111 | |
| 112 | function benchIter(chunk, numConsumers, datasize, n, totalOps) { |
| 113 | const { broadcast } = require('stream/iter'); |
| 114 | |
| 115 | // No-op consumer: drain all batches without collecting |
| 116 | async function drain(source) { |
| 117 | // eslint-disable-next-line no-unused-vars |
| 118 | for await (const _ of source) { /* drain */ } |
| 119 | } |
| 120 | |
| 121 | async function run() { |
| 122 | const { writer, broadcast: bc } = broadcast(); |
| 123 | const consumers = []; |
| 124 | for (let c = 0; c < numConsumers; c++) { |
| 125 | consumers.push(drain(bc.push())); |
| 126 | } |
| 127 | |
| 128 | let remaining = datasize; |
| 129 | while (remaining > 0) { |
| 130 | const size = Math.min(remaining, chunk.length); |
| 131 | const buf = size === chunk.length ? chunk : chunk.subarray(0, size); |
| 132 | if (!writer.writeSync(buf)) { |
| 133 | await writer.write(buf); |
| 134 | } |
| 135 | remaining -= size; |
| 136 | } |
| 137 | writer.endSync(); |
| 138 | |
| 139 | await Promise.all(consumers); |
| 140 | } |
| 141 | |
| 142 | (async () => { |
| 143 | bench.start(); |
| 144 | for (let i = 0; i < n; i++) await run(); |
| 145 | bench.end(totalOps); |
| 146 | })(); |
| 147 | } |