MCPcopy
hub / github.com/cloudflare/capnweb / readLoop

Method readLoop

src/rpc.ts:929–1056  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

927 }
928
929 private async readLoop() {
930 while (!this.abortReason) {
931 // Each receive needs its own abort promise so Promise.race() doesn't keep old reads.
932 let readCanceled = Promise.withResolvers<never>();
933 this.cancelReadLoop = readCanceled.reject;
934
935 let raw: unknown;
936
937 try {
938 raw = await Promise.race([this.transport.receive(), readCanceled.promise]);
939 } finally {
940 if (this.cancelReadLoop === readCanceled.reject) {
941 this.cancelReadLoop = undefined;
942 }
943 }
944 if (this.abortReason) break; // check again before processing
945
946 // Only parse JSON at "string" level; otherwise message is already an object
947 let msg = this.encodingLevel === "string" ? JSON.parse(raw as string) : raw;
948
949 if (msg instanceof Array) {
950 switch (msg[0]) {
951 case "push": // ["push", Expression]
952 if (msg.length > 1) {
953 let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]);
954 let hook = new PayloadStubHook(payload);
955
956 // It's possible for a rejection to occur before the client gets a chance to send
957 // a "pull" message or to use the promise in a pipeline. We don't want that to be
958 // treated as an unhandled rejection on our end.
959 hook.ignoreUnhandledRejections();
960
961 this.exports.push({ hook, refcount: 1 });
962 continue;
963 }
964 break;
965
966 case "stream": { // ["stream", Expression]
967 // Like "push", but:
968 // - Promise pipelining on the result is not supported.
969 // - The export is automatically considered "pulled".
970 // - Once the "resolve" is sent, the export is implicitly released.
971 if (msg.length > 1) {
972 let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]);
973 let hook = new PayloadStubHook(payload);
974 hook.ignoreUnhandledRejections();
975
976 let exportId = this.exports.length;
977 this.exports.push({ hook, refcount: 1, autoRelease: true });
978
979 // Automatically pull since stream messages are always pulled.
980 this.ensureResolvingExport(exportId);
981 continue;
982 }
983 break;
984 }
985
986 case "pipe": { // ["pipe"]

Callers 1

constructorMethod · 0.95

Calls 9

ensureResolvingExportMethod · 0.95
releaseExportMethod · 0.95
abortMethod · 0.95
evaluateMethod · 0.80
receiveMethod · 0.65
parseMethod · 0.65
disposeMethod · 0.65
resolveMethod · 0.45

Tested by

no test coverage detected