| 47186 | //create a readable writable stream. |
| 47187 | |
| 47188 | function through (write, end, opts) { |
| 47189 | write = write || function (data) { this.queue(data) } |
| 47190 | end = end || function () { this.queue(null) } |
| 47191 | |
| 47192 | var ended = false, destroyed = false, buffer = [], _ended = false |
| 47193 | var stream = new Stream() |
| 47194 | stream.readable = stream.writable = true |
| 47195 | stream.paused = false |
| 47196 | |
| 47197 | // stream.autoPause = !(opts && opts.autoPause === false) |
| 47198 | stream.autoDestroy = !(opts && opts.autoDestroy === false) |
| 47199 | |
| 47200 | stream.write = function (data) { |
| 47201 | write.call(this, data) |
| 47202 | return !stream.paused |
| 47203 | } |
| 47204 | |
| 47205 | function drain() { |
| 47206 | while(buffer.length && !stream.paused) { |
| 47207 | var data = buffer.shift() |
| 47208 | if(null === data) |
| 47209 | return stream.emit('end') |
| 47210 | else |
| 47211 | stream.emit('data', data) |
| 47212 | } |
| 47213 | } |
| 47214 | |
| 47215 | stream.queue = stream.push = function (data) { |
| 47216 | // console.error(ended) |
| 47217 | if(_ended) return stream |
| 47218 | if(data === null) _ended = true |
| 47219 | buffer.push(data) |
| 47220 | drain() |
| 47221 | return stream |
| 47222 | } |
| 47223 | |
| 47224 | //this will be registered as the first 'end' listener |
| 47225 | //must call destroy next tick, to make sure we're after any |
| 47226 | //stream piped from here. |
| 47227 | //this is only a problem if end is not emitted synchronously. |
| 47228 | //a nicer way to do this is to make sure this is the last listener for 'end' |
| 47229 | |
| 47230 | stream.on('end', function () { |
| 47231 | stream.readable = false |
| 47232 | if(!stream.writable && stream.autoDestroy) |
| 47233 | process.nextTick(function () { |
| 47234 | stream.destroy() |
| 47235 | }) |
| 47236 | }) |
| 47237 | |
| 47238 | function _end () { |
| 47239 | stream.writable = false |
| 47240 | end.call(stream) |
| 47241 | if(!stream.readable && stream.autoDestroy) |
| 47242 | stream.destroy() |
| 47243 | } |
| 47244 | |
| 47245 | stream.end = function (data) { |