MCPcopy
hub / github.com/coder/mux / createAsyncMessageQueue

Function createAsyncMessageQueue

src/common/utils/asyncMessageQueue.ts:24–72  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

22 * ```
23 */
24export function createAsyncMessageQueue<T>(): {
25 push: (msg: T) => void;
26 iterate: () => AsyncGenerator<T>;
27 end: () => void;
28} {
29 const queue: T[] = [];
30 let resolveNext: (() => void) | null = null;
31 let ended = false;
32
33 const push = (msg: T) => {
34 if (ended) return;
35 queue.push(msg);
36 // Signal that new messages are available
37 if (resolveNext) {
38 const resolve = resolveNext;
39 resolveNext = null;
40 resolve();
41 }
42 };
43
44 async function* iterate(): AsyncGenerator<T> {
45 while (!ended) {
46 // Yield all queued messages synchronously (no async boundaries)
47 // This ensures all messages from a batch are processed in the same
48 // event loop tick, preventing premature renders
49 while (queue.length > 0) {
50 yield queue.shift()!;
51 }
52 // Wait for more messages
53 await new Promise<void>((resolve) => {
54 resolveNext = resolve;
55 });
56 }
57 // Yield any remaining messages after end() is called
58 while (queue.length > 0) {
59 yield queue.shift()!;
60 }
61 }
62
63 const end = () => {
64 ended = true;
65 // Wake up the iterator so it can exit
66 if (resolveNext) {
67 resolveNext();
68 }
69 };
70
71 return { push, iterate, end };
72}

Callers 3

createMockORPCClientFunction · 0.90
routerFunction · 0.90
runChatSubscriptionMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected