| 501 | //create a readable writable stream. |
| 502 | |
| 503 | function through (write, end, opts) { |
| 504 | write = write || function (data) { this.queue(data) } |
| 505 | end = end || function () { this.queue(null) } |
| 506 | |
| 507 | var ended = false, destroyed = false, buffer = [], _ended = false |
| 508 | var stream = new Stream() |
| 509 | stream.readable = stream.writable = true |
| 510 | stream.paused = false |
| 511 | |
| 512 | // stream.autoPause = !(opts && opts.autoPause === false) |
| 513 | stream.autoDestroy = !(opts && opts.autoDestroy === false) |
| 514 | |
| 515 | stream.write = function (data) { |
| 516 | write.call(this, data) |
| 517 | return !stream.paused |
| 518 | } |
| 519 | |
| 520 | function drain() { |
| 521 | while(buffer.length && !stream.paused) { |
| 522 | var data = buffer.shift() |
| 523 | if(null === data) |
| 524 | return stream.emit('end') |
| 525 | else |
| 526 | stream.emit('data', data) |
| 527 | } |
| 528 | } |
| 529 | |
| 530 | stream.queue = stream.push = function (data) { |
| 531 | // console.error(ended) |
| 532 | if(_ended) return stream |
| 533 | if(data === null) _ended = true |
| 534 | buffer.push(data) |
| 535 | drain() |
| 536 | return stream |
| 537 | } |
| 538 | |
| 539 | //this will be registered as the first 'end' listener |
| 540 | //must call destroy next tick, to make sure we're after any |
| 541 | //stream piped from here. |
| 542 | //this is only a problem if end is not emitted synchronously. |
| 543 | //a nicer way to do this is to make sure this is the last listener for 'end' |
| 544 | |
| 545 | stream.on('end', function () { |
| 546 | stream.readable = false |
| 547 | if(!stream.writable && stream.autoDestroy) |
| 548 | process.nextTick(function () { |
| 549 | stream.destroy() |
| 550 | }) |
| 551 | }) |
| 552 | |
| 553 | function _end () { |
| 554 | stream.writable = false |
| 555 | end.call(stream) |
| 556 | if(!stream.readable && stream.autoDestroy) |
| 557 | stream.destroy() |
| 558 | } |
| 559 | |
| 560 | stream.end = function (data) { |