(src)
| 3518 | } |
| 3519 | |
| 3520 | function flow(src) { |
| 3521 | var state = src._readableState; |
| 3522 | var chunk; |
| 3523 | state.awaitDrain = 0; |
| 3524 | |
| 3525 | function write(dest, i, list) { |
| 3526 | var written = dest.write(chunk); |
| 3527 | if (false === written) { |
| 3528 | state.awaitDrain++; |
| 3529 | } |
| 3530 | } |
| 3531 | |
| 3532 | while (state.pipesCount && null !== (chunk = src.read())) { |
| 3533 | |
| 3534 | if (state.pipesCount === 1) |
| 3535 | write(state.pipes, 0, null); |
| 3536 | else |
| 3537 | forEach(state.pipes, write); |
| 3538 | |
| 3539 | src.emit('data', chunk); |
| 3540 | |
| 3541 | // if anyone needs a drain, then we have to wait for that. |
| 3542 | if (state.awaitDrain > 0) |
| 3543 | return; |
| 3544 | } |
| 3545 | |
| 3546 | // if every destination was unpiped, either before entering this |
| 3547 | // function, or in the while loop, then stop flowing. |
| 3548 | // |
| 3549 | // NB: This is a pretty rare edge case. |
| 3550 | if (state.pipesCount === 0) { |
| 3551 | state.flowing = false; |
| 3552 | |
| 3553 | // if there were data event listeners added, then switch to old mode. |
| 3554 | if (EE.listenerCount(src, 'data') > 0) |
| 3555 | emitDataEvents(src); |
| 3556 | return; |
| 3557 | } |
| 3558 | |
| 3559 | // at this point, no one needed a drain, so we just ran out of data |
| 3560 | // on the next readable event, start it over again. |
| 3561 | state.ranOut = true; |
| 3562 | } |
| 3563 | |
| 3564 | function pipeOnReadable() { |
| 3565 | if (this._readableState.ranOut) { |
no test coverage detected