()
| 927 | } |
| 928 | |
| 929 | private async readLoop() { |
| 930 | while (!this.abortReason) { |
| 931 | // Each receive needs its own abort promise so Promise.race() doesn't keep old reads. |
| 932 | let readCanceled = Promise.withResolvers<never>(); |
| 933 | this.cancelReadLoop = readCanceled.reject; |
| 934 | |
| 935 | let raw: unknown; |
| 936 | |
| 937 | try { |
| 938 | raw = await Promise.race([this.transport.receive(), readCanceled.promise]); |
| 939 | } finally { |
| 940 | if (this.cancelReadLoop === readCanceled.reject) { |
| 941 | this.cancelReadLoop = undefined; |
| 942 | } |
| 943 | } |
| 944 | if (this.abortReason) break; // check again before processing |
| 945 | |
| 946 | // Only parse JSON at "string" level; otherwise message is already an object |
| 947 | let msg = this.encodingLevel === "string" ? JSON.parse(raw as string) : raw; |
| 948 | |
| 949 | if (msg instanceof Array) { |
| 950 | switch (msg[0]) { |
| 951 | case "push": // ["push", Expression] |
| 952 | if (msg.length > 1) { |
| 953 | let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); |
| 954 | let hook = new PayloadStubHook(payload); |
| 955 | |
| 956 | // It's possible for a rejection to occur before the client gets a chance to send |
| 957 | // a "pull" message or to use the promise in a pipeline. We don't want that to be |
| 958 | // treated as an unhandled rejection on our end. |
| 959 | hook.ignoreUnhandledRejections(); |
| 960 | |
| 961 | this.exports.push({ hook, refcount: 1 }); |
| 962 | continue; |
| 963 | } |
| 964 | break; |
| 965 | |
| 966 | case "stream": { // ["stream", Expression] |
| 967 | // Like "push", but: |
| 968 | // - Promise pipelining on the result is not supported. |
| 969 | // - The export is automatically considered "pulled". |
| 970 | // - Once the "resolve" is sent, the export is implicitly released. |
| 971 | if (msg.length > 1) { |
| 972 | let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); |
| 973 | let hook = new PayloadStubHook(payload); |
| 974 | hook.ignoreUnhandledRejections(); |
| 975 | |
| 976 | let exportId = this.exports.length; |
| 977 | this.exports.push({ hook, refcount: 1, autoRelease: true }); |
| 978 | |
| 979 | // Automatically pull since stream messages are always pulled. |
| 980 | this.ensureResolvingExport(exportId); |
| 981 | continue; |
| 982 | } |
| 983 | break; |
| 984 | } |
| 985 | |
| 986 | case "pipe": { // ["pipe"] |
no test coverage detected