(src)
| 3398 | } |
| 3399 | |
| 3400 | function flow(src) { |
| 3401 | var state = src._readableState; |
| 3402 | var chunk; |
| 3403 | state.awaitDrain = 0; |
| 3404 | |
| 3405 | function write(dest, i, list) { |
| 3406 | var written = dest.write(chunk); |
| 3407 | if (false === written) { |
| 3408 | state.awaitDrain++; |
| 3409 | } |
| 3410 | } |
| 3411 | |
| 3412 | while (state.pipesCount && null !== (chunk = src.read())) { |
| 3413 | |
| 3414 | if (state.pipesCount === 1) |
| 3415 | write(state.pipes, 0, null); |
| 3416 | else |
| 3417 | forEach(state.pipes, write); |
| 3418 | |
| 3419 | src.emit('data', chunk); |
| 3420 | |
| 3421 | // if anyone needs a drain, then we have to wait for that. |
| 3422 | if (state.awaitDrain > 0) |
| 3423 | return; |
| 3424 | } |
| 3425 | |
| 3426 | // if every destination was unpiped, either before entering this |
| 3427 | // function, or in the while loop, then stop flowing. |
| 3428 | // |
| 3429 | // NB: This is a pretty rare edge case. |
| 3430 | if (state.pipesCount === 0) { |
| 3431 | state.flowing = false; |
| 3432 | |
| 3433 | // if there were data event listeners added, then switch to old mode. |
| 3434 | if (EE.listenerCount(src, 'data') > 0) |
| 3435 | emitDataEvents(src); |
| 3436 | return; |
| 3437 | } |
| 3438 | |
| 3439 | // at this point, no one needed a drain, so we just ran out of data |
| 3440 | // on the next readable event, start it over again. |
| 3441 | state.ranOut = true; |
| 3442 | } |
| 3443 | |
| 3444 | function pipeOnReadable() { |
| 3445 | if (this._readableState.ranOut) { |
no test coverage detected