MCPcopy
hub / github.com/cloudflare/capnweb / ObjectTestTransport

Class ObjectTestTransport

__tests__/index.test.ts:479–533  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

477}
478
479class ObjectTestTransport implements RpcTransportWithCustomEncoding {
480 constructor(
481 private partner?: ObjectTestTransport,
482 readonly encodingLevel: CustomEncodingLevel = "jsonCompatible") {
483 if (partner) {
484 partner.partner = this;
485 }
486 }
487
488 private queue: unknown[] = [];
489 private waiter?: () => void;
490 private aborter?: (err: any) => void;
491 private fenced = false;
492
493 send(message: unknown): void {
494 let cloned = this.encodingLevel === "jsonCompatible" ? JSON.parse(JSON.stringify(message))
495 : structuredClone(message);
496 this.partner!.queue.push(cloned);
497 if (this.partner!.waiter && !this.partner!.fenced) {
498 this.partner!.waiter();
499 this.partner!.waiter = undefined;
500 this.partner!.aborter = undefined;
501 }
502 }
503
504 async receive(): Promise<unknown> {
505 while (this.queue.length == 0 || this.fenced) {
506 await new Promise<void>((resolve, reject) => {
507 this.waiter = resolve;
508 this.aborter = reject;
509 });
510 }
511
512 return this.queue.shift()!;
513 }
514
515 fence() {
516 this.fenced = true;
517 }
518
519 releaseFence() {
520 this.fenced = false;
521 if (this.queue.length > 0 && this.waiter) {
522 this.waiter();
523 this.waiter = undefined;
524 this.aborter = undefined;
525 }
526 }
527
528 abort(reason: any): void {
529 this.aborter?.(reason);
530 this.waiter = undefined;
531 this.aborter = undefined;
532 }
533}
534
535// Spin the microtask queue a bit to give messages time to be delivered and handled.
536async function pumpMicrotasks() {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…