* @param {AbortCallback} abort * @param {import('stream').Stream} body * @param {import('./client.js')} client * @param {import('../core/request.js')} request * @param {import('net').Socket} socket * @param {number} contentLength * @param {string} header * @param {boolean} expectsPayload
(abort, body, client, request, socket, contentLength, header, expectsPayload)
| 1336 | * @param {boolean} expectsPayload |
| 1337 | */ |
| 1338 | function writeStream (abort, body, client, request, socket, contentLength, header, expectsPayload) { |
| 1339 | assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') |
| 1340 | |
| 1341 | let finished = false |
| 1342 | |
| 1343 | const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header }) |
| 1344 | |
| 1345 | /** |
| 1346 | * @param {Buffer} chunk |
| 1347 | * @returns {void} |
| 1348 | */ |
| 1349 | const onData = function (chunk) { |
| 1350 | if (finished) { |
| 1351 | return |
| 1352 | } |
| 1353 | |
| 1354 | try { |
| 1355 | if (!writer.write(chunk) && this.pause) { |
| 1356 | this.pause() |
| 1357 | } |
| 1358 | } catch (err) { |
| 1359 | util.destroy(this, err) |
| 1360 | } |
| 1361 | } |
| 1362 | |
| 1363 | /** |
| 1364 | * @returns {void} |
| 1365 | */ |
| 1366 | const onDrain = function () { |
| 1367 | if (finished) { |
| 1368 | return |
| 1369 | } |
| 1370 | |
| 1371 | if (body.resume) { |
| 1372 | body.resume() |
| 1373 | } |
| 1374 | } |
| 1375 | |
| 1376 | /** |
| 1377 | * @returns {void} |
| 1378 | */ |
| 1379 | const onClose = function () { |
| 1380 | // 'close' might be emitted *before* 'error' for |
| 1381 | // broken streams. Wait a tick to avoid this case. |
| 1382 | queueMicrotask(() => { |
| 1383 | // It's only safe to remove 'error' listener after |
| 1384 | // 'close'. |
| 1385 | body.removeListener('error', onFinished) |
| 1386 | }) |
| 1387 | |
| 1388 | if (!finished) { |
| 1389 | const err = new RequestAbortedError() |
| 1390 | queueMicrotask(() => onFinished(err)) |
| 1391 | } |
| 1392 | } |
| 1393 | |
| 1394 | /** |
| 1395 | * @param {Error} [err] |