(handle, stream, source)
| 1479 | } |
| 1480 | |
| 1481 | async function consumeSyncSource(handle, stream, source) { |
| 1482 | handle.initStreamingSource(); |
| 1483 | initStreamingBackpressure(stream); |
| 1484 | // Normalize to Iterable<Uint8Array[]>. Manually iterate so we can |
| 1485 | // pause between next() calls when backpressure hits. |
| 1486 | const normalized = streamFromSync(source); |
| 1487 | const iter = normalized[SymbolIterator](); |
| 1488 | try { |
| 1489 | while (true) { |
| 1490 | if (stream.destroyed) return; |
| 1491 | const { value: batch, done } = iter.next(); |
| 1492 | if (done) break; |
| 1493 | if (await writeBatchWithDrain(handle, stream, batch)) return; |
| 1494 | } |
| 1495 | handle.endWrite(); |
| 1496 | } catch (err) { |
| 1497 | if (!stream.destroyed) { |
| 1498 | stream.destroy(err); |
| 1499 | } else { |
| 1500 | // If the stream is already destroyed, rethrow the error to avoid |
| 1501 | // silently swallowing it. Tho in practice this shouldn't happen. |
| 1502 | throw err; |
| 1503 | } |
| 1504 | } |
| 1505 | } |
| 1506 | |
| 1507 | function isAsyncIterable(obj) { |
| 1508 | return obj != null && typeof obj[SymbolAsyncIterator] === 'function'; |
no test coverage detected
searching dependent graphs…