MCPcopy
hub / github.com/cloudflare/capnweb / send

Method send

src/rpc.ts:708–763  ·  view source on GitHub ↗
(msg: any)

Source from the content-addressed store, hash-verified

706 // Serializes and sends a message. Returns the byte length reported by the transport, or
707 // undefined if the transport doesn't report size.
708 private send(msg: any): number | undefined {
709 if (this.abortReason !== undefined) {
710 // Ignore sends after we've aborted.
711 return 0;
712 }
713
714 if (this.encodingLevel === "string") {
715 let msgText: string;
716 try {
717 msgText = JSON.stringify(msg);
718 } catch (err) {
719 // If JSON stringification failed, there's something wrong with the devaluator, as it
720 // should not allow non-JSONable values to be injected in the first place.
721 try { this.abort(err); } catch (err2) {}
722 throw err;
723 }
724
725 try {
726 let sent = (this.transport as RpcTransport).send(msgText) as Promise<void> | undefined;
727 if (sent !== undefined && typeof sent.catch === "function") {
728 // If send fails, abort the connection, but don't try to send an abort message since
729 // that'll probably also fail.
730 sent.catch(err => this.abort(err, false));
731 }
732 } catch (err) {
733 // The transport threw synchronously. Treat it like an async send failure: abort the
734 // session (without trying to send an abort message over the broken transport), but
735 // defer to a microtask so the caller finishes its own bookkeeping first, matching the
736 // timing of a rejected promise from an async transport.
737 queueMicrotask(() => this.abort(err, false));
738 }
739 return msgText.length;
740 } else {
741 // Custom encoding transport encodes and returns the actual encoded size, or void if size
742 // is unavailable (e.g. structured clone).
743 try {
744 let size = (this.transport as RpcTransportWithCustomEncoding).send(msg);
745 if (typeof size === "number") {
746 return size;
747 }
748 // Defend against transports that return something other than a number, e.g. an
749 // accidentally-async `send()` returning a promise: treat the size as unknown, and
750 // observe any returned thenable so a rejection aborts the session rather than going
751 // unhandled. (The documented contract is to report errors via `receive()`.)
752 let thenable = size as unknown;
753 if (thenable && typeof (thenable as PromiseLike<unknown>).then === "function") {
754 Promise.resolve(thenable).catch(err => this.abort(err, false));
755 }
756 return undefined;
757 } catch (err) {
758 // Same as the synchronous failure case above.
759 queueMicrotask(() => this.abort(err, false));
760 return undefined;
761 }
762 }
763 }
764
765 sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook {

Callers 7

ensureResolvingExportMethod · 0.95
createPipeMethod · 0.95
sendCallMethod · 0.95
sendStreamMethod · 0.95
sendMapMethod · 0.95
sendPullMethod · 0.95
sendReleaseMethod · 0.95

Calls 4

abortMethod · 0.95
catchMethod · 0.80
sendMethod · 0.65
resolveMethod · 0.45

Tested by

no test coverage detected