MCPcopy
hub / github.com/CopilotKit/CopilotKit / MessageStream

Class MessageStream

packages/bot-slack/src/message-stream.ts:24–93  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

22const DEFAULT_MIN_INTERVAL_MS = 800;
23
24export class MessageStream {
25 private buffer = "";
26 private posted = "";
27 private queue: Promise<void> = Promise.resolve();
28 private lastFlushedAt = 0;
29 private flushTimer: NodeJS.Timeout | undefined;
30 private readonly minIntervalMs: number;
31 private readonly update: (text: string) => Promise<void>;
32
33 constructor(config: MessageStreamConfig) {
34 this.update = config.update;
35 this.minIntervalMs = config.minIntervalMs ?? DEFAULT_MIN_INTERVAL_MS;
36 }
37
38 /** Replace the in-flight buffer (callers pass the accumulated text). */
39 append(text: string): void {
40 if (text === this.buffer) return;
41 this.buffer = text;
42 this.scheduleFlush();
43 }
44
45 /**
46 * Mark the stream done. Cancels any pending throttled flush, enqueues a
47 * final flush, and resolves once the entire queue (including the final
48 * flush and anything previously in flight) has drained.
49 *
50 * After this resolves, the Slack message reflects the final buffer state.
51 */
52 async finish(): Promise<void> {
53 if (this.flushTimer) {
54 clearTimeout(this.flushTimer);
55 this.flushTimer = undefined;
56 }
57 this.enqueueFlush();
58 await this.queue;
59 }
60
61 private scheduleFlush(): void {
62 if (this.flushTimer) return;
63 const elapsed = Date.now() - this.lastFlushedAt;
64 const delay = Math.max(0, this.minIntervalMs - elapsed);
65 this.flushTimer = setTimeout(() => {
66 this.flushTimer = undefined;
67 this.enqueueFlush();
68 }, delay);
69 }
70
71 private enqueueFlush(): void {
72 this.queue = this.queue.then(() => this.flushNow());
73 }
74
75 private async flushNow(): Promise<void> {
76 if (this.buffer === this.posted) return;
77 const text = this.buffer;
78 this.posted = text;
79 try {
80 await this.update(text);
81 } catch (err) {

Callers

nothing calls this directly

Calls 1

resolveMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…