(src)
| 50203 | } |
| 50204 | |
| 50205 | function flow(src) { |
| 50206 | var state = src._readableState; |
| 50207 | var chunk; |
| 50208 | state.awaitDrain = 0; |
| 50209 | |
| 50210 | function write(dest, i, list) { |
| 50211 | var written = dest.write(chunk); |
| 50212 | if (false === written) { |
| 50213 | state.awaitDrain++; |
| 50214 | } |
| 50215 | } |
| 50216 | |
| 50217 | while (state.pipesCount && null !== (chunk = src.read())) { |
| 50218 | |
| 50219 | if (state.pipesCount === 1) |
| 50220 | write(state.pipes, 0, null); |
| 50221 | else |
| 50222 | forEach(state.pipes, write); |
| 50223 | |
| 50224 | src.emit('data', chunk); |
| 50225 | |
| 50226 | // if anyone needs a drain, then we have to wait for that. |
| 50227 | if (state.awaitDrain > 0) |
| 50228 | return; |
| 50229 | } |
| 50230 | |
| 50231 | // if every destination was unpiped, either before entering this |
| 50232 | // function, or in the while loop, then stop flowing. |
| 50233 | // |
| 50234 | // NB: This is a pretty rare edge case. |
| 50235 | if (state.pipesCount === 0) { |
| 50236 | state.flowing = false; |
| 50237 | |
| 50238 | // if there were data event listeners added, then switch to old mode. |
| 50239 | if (EE.listenerCount(src, 'data') > 0) |
| 50240 | emitDataEvents(src); |
| 50241 | return; |
| 50242 | } |
| 50243 | |
| 50244 | // at this point, no one needed a drain, so we just ran out of data |
| 50245 | // on the next readable event, start it over again. |
| 50246 | state.ranOut = true; |
| 50247 | } |
| 50248 | |
| 50249 | function pipeOnReadable() { |
| 50250 | if (this._readableState.ranOut) { |
no test coverage detected