| 682 | } |
| 683 | |
| 684 | createPipe(readable: ReadableStream, readableHook: StubHook): ImportId { |
| 685 | if (this.abortReason) throw this.abortReason; |
| 686 | |
| 687 | this.send(["pipe"]); |
| 688 | |
| 689 | let importId = this.imports.length; |
| 690 | // The pipe import is not a promise -- it's immediately usable as a writable stream. |
| 691 | let entry = new ImportTableEntry(this, importId, false); |
| 692 | this.imports.push(entry); |
| 693 | |
| 694 | // Create a proxy WritableStream from the import hook and pump the ReadableStream into it. |
| 695 | let hook = new RpcImportHook(/*isPromise=*/false, entry); |
| 696 | let writable = streamImpl.createWritableStreamFromHook(hook); |
| 697 | readable.pipeTo(writable).catch(() => { |
| 698 | // Errors are handled by the writable stream's error handling -- either the write fails |
| 699 | // and the writable side reports it, or the readable side errors and pipeTo aborts the |
| 700 | // writable side. Either way, the hook's disposal will handle cleanup. |
| 701 | }).finally(() => readableHook.dispose()); |
| 702 | |
| 703 | return importId; |
| 704 | } |
| 705 | |
| 706 | // Serializes and sends a message. Returns the byte length reported by the transport, or |
| 707 | // undefined if the transport doesn't report size. |