MCPcopy
hub / github.com/inkeep/open-knowledge / createMessageReader

Function createMessageReader

packages/cli/src/mcp/bridge-e2e.test.ts:38–100  ·  view source on GitHub ↗
(stdout: PassThrough)

Source from the content-addressed store, hash-verified

36}
37
38function createMessageReader(stdout: PassThrough): {
39 waitFor: (
40 predicate: (message: Record<string, unknown>) => boolean,
41 ) => Promise<Record<string, unknown>>;
42} {
43 let buffer = '';
44 const messages: Record<string, unknown>[] = [];
45 const waiters: Array<{
46 predicate: (message: Record<string, unknown>) => boolean;
47 resolve: (message: Record<string, unknown>) => void;
48 reject: (err: Error) => void;
49 timer: ReturnType<typeof setTimeout>;
50 }> = [];
51
52 function drainWaiters(): void {
53 for (let waiterIndex = 0; waiterIndex < waiters.length; waiterIndex++) {
54 const waiter = waiters[waiterIndex];
55 if (!waiter) continue;
56 const messageIndex = messages.findIndex(waiter.predicate);
57 if (messageIndex === -1) continue;
58 const [message] = messages.splice(messageIndex, 1);
59 clearTimeout(waiter.timer);
60 waiters.splice(waiterIndex, 1);
61 waiter.resolve(message);
62 waiterIndex--;
63 }
64 }
65
66 stdout.on('data', (chunk: Buffer) => {
67 buffer += chunk.toString('utf8');
68 while (true) {
69 const newline = buffer.indexOf('\n');
70 if (newline === -1) break;
71 const line = buffer.slice(0, newline).replace(/\r$/, '');
72 buffer = buffer.slice(newline + 1);
73 if (line.length === 0) continue;
74 const parsed = JSON.parse(line) as unknown;
75 if (!isRecord(parsed)) {
76 throw new Error(`stdio response was not a JSON object: ${line}`);
77 }
78 messages.push(parsed);
79 }
80 drainWaiters();
81 });
82
83 return {
84 waitFor(predicate) {
85 const existingIndex = messages.findIndex(predicate);
86 if (existingIndex !== -1) {
87 const [message] = messages.splice(existingIndex, 1);
88 return Promise.resolve(message);
89 }
90 return new Promise((resolve, reject) => {
91 const timer = setTimeout(() => {
92 const index = waiters.findIndex((waiter) => waiter.timer === timer);
93 if (index !== -1) waiters.splice(index, 1);
94 reject(new Error('timed out waiting for stdio JSON-RPC response'));
95 }, 5_000);

Callers 1

bridge-e2e.test.tsFile · 0.85

Calls 5

parseMethod · 0.80
isRecordFunction · 0.70
drainWaitersFunction · 0.70
onMethod · 0.65
pushMethod · 0.45

Tested by

no test coverage detected