MCPcopy
hub / github.com/LAION-AI/Open-Assistant / handleChatEventStream

Function handleChatEventStream

website/src/lib/chat_stream.ts:15–50  ·  view source on GitHub ↗
({
  stream,
  onError,
  onPending,
  onToken,
}: ChatStreamHandlerOptions)

Source from the content-addressed store, hash-verified

13}
14
15export async function handleChatEventStream({
16 stream,
17 onError,
18 onPending,
19 onToken,
20}: ChatStreamHandlerOptions): Promise<InferenceMessage | null> {
21 let tokens = "";
22 for await (const { event, data } of iteratorSSE(stream)) {
23 if (event === "error") {
24 await onError(data);
25 } else if (event === "ping") {
26 continue;
27 }
28 try {
29 const chunk: InferenceEvent = JSON.parse(data);
30 if (chunk.event_type === "pending") {
31 await onPending({ queuePosition: chunk.queue_position, queueSize: chunk.queue_size });
32 } else if (chunk.event_type === "token") {
33 tokens += chunk.text;
34 await onToken(tokens);
35 } else if (chunk.event_type === "message") {
36 // final message
37 return chunk.message;
38 } else if (chunk.event_type === "error") {
39 // handle error
40 await onError(chunk.error);
41 return chunk.message;
42 } else {
43 console.error("Unexpected event", chunk);
44 }
45 } catch (e) {
46 console.error(`Error parsing data: ${data}, error: ${e}`);
47 }
48 }
49 return null;
50}
51
52export async function* iteratorSSE(stream: ReadableStream<Uint8Array>) {
53 const reader = stream.pipeThrough(new TextDecoderStream()).getReader();

Callers 1

Calls 2

iteratorSSEFunction · 0.85
parseMethod · 0.45

Tested by

no test coverage detected