MCPcopy
hub / github.com/CopilotKit/CopilotKit / NativeMessageStream

Class NativeMessageStream

packages/bot-slack/src/native-stream.ts:93–309  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

91const APPEND_CHAR_LIMIT = 12000;
92
93export class NativeMessageStream implements TextStream {
94 private buffer = "";
95 private queue: Promise<void> = Promise.resolve();
96 private lastFlushedAt = 0;
97 private flushTimer: ReturnType<typeof setTimeout> | undefined;
98 private finished = false;
99
100 /** Current streamed message ts (undefined until the first `startStream`). */
101 private curTs: string | undefined;
102 /** Buffer chars already appended as text to the current message. */
103 private curPosted = 0;
104 /** ts of the first streamed message (for the returned MessageRef). */
105 private firstTsValue: string | undefined;
106
107 /** Set once `startStream` has failed and we've fallen back to the legacy transport. */
108 private legacy: TextStream | undefined;
109 /** Set once a chunk append has failed/been refused, so we stop trying. */
110 private chunksDisabled = false;
111
112 private readonly transport: NativeStreamTransport;
113 private readonly makeFallback: () => TextStream;
114 private readonly onStartFailure: ((err: unknown) => void) | undefined;
115 private readonly onChunkFailure: ((err: unknown) => void) | undefined;
116 private readonly minIntervalMs: number;
117
118 constructor(config: NativeMessageStreamConfig) {
119 this.transport = config.transport;
120 this.makeFallback = config.fallback;
121 this.onStartFailure = config.onStartFailure;
122 this.onChunkFailure = config.onChunkFailure;
123 this.minIntervalMs = config.minIntervalMs ?? DEFAULT_MIN_INTERVAL_MS;
124 }
125
126 /** The first streamed message's ts (or the fallback's), available after finish(). */
127 get firstTs(): string | undefined {
128 return this.firstTsValue;
129 }
130
131 append(fullText: string): void {
132 if (this.legacy) {
133 this.legacy.append(fullText);
134 return;
135 }
136 if (fullText === this.buffer) return;
137 this.buffer = fullText;
138 this.scheduleFlush();
139 }
140
141 /**
142 * Append a structured chunk (`task_update` / `plan_update` / `blocks`) to the
143 * streamed message. Flushes any pending text first so the chunk lands AFTER
144 * the text emitted so far. No-op (firing `onChunkFailure` once) when the
145 * stream has fallen back to legacy or chunks were already refused.
146 */
147 appendChunk(chunk: AnyChunk): void {
148 if (this.legacy || this.chunksDisabled) {
149 if (!this.chunksDisabled) {
150 this.chunksDisabled = true;

Callers

nothing calls this directly

Calls 1

resolveMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…