MCPcopy Index your code
hub / github.com/simstudioai/sim / processSSEStream

Function processSSEStream

apps/sim/lib/copilot/request/go/parser.ts:22–129  ·  view source on GitHub ↗
(
  reader: ReadableStreamDefaultReader<Uint8Array>,
  decoder: TextDecoder,
  abortSignal: AbortSignal | undefined,
  onEvent: (event: unknown) => boolean | undefined | Promise<boolean | undefined>
)

Source from the content-addressed store, hash-verified

20 * @param onEvent Called per parsed event. Return true to stop processing.
21 */
22export async function processSSEStream(
23 reader: ReadableStreamDefaultReader<Uint8Array>,
24 decoder: TextDecoder,
25 abortSignal: AbortSignal | undefined,
26 onEvent: (event: unknown) => boolean | undefined | Promise<boolean | undefined>
27): Promise<void> {
28 let buffer = ''
29
30 try {
31 try {
32 while (true) {
33 if (abortSignal?.aborted) {
34 logger.info('SSE stream aborted by signal')
35 break
36 }
37
38 const { done, value } = await reader.read()
39 if (done) break
40
41 buffer += decoder.decode(value, { stream: true })
42 const lines = buffer.split('\n')
43 buffer = lines.pop() || ''
44
45 let stopped = false
46 for (const line of lines) {
47 const normalizedLine = normalizeSseLine(line)
48 if (abortSignal?.aborted) {
49 logger.info('SSE stream aborted mid-chunk (between events)')
50 return
51 }
52 if (!normalizedLine.trim()) continue
53 if (!normalizedLine.startsWith('data: ')) continue
54
55 const jsonStr = normalizedLine.slice(6)
56 if (jsonStr === '[DONE]') continue
57
58 let parsed: unknown
59 try {
60 parsed = JSON.parse(jsonStr)
61 } catch (error) {
62 const preview = jsonStr.slice(0, 200)
63 const detail = toError(error).message
64 throw createParseFailure(`Failed to parse SSE event JSON: ${detail}`, preview)
65 }
66
67 try {
68 if (await onEvent(parsed)) {
69 stopped = true
70 break
71 }
72 } catch (error) {
73 if (error instanceof FatalSseEventError) {
74 throw error
75 }
76 logger.warn('Failed to handle SSE event', {
77 preview: jsonStr.slice(0, 200),
78 error: toError(error).message,
79 })

Callers 1

runStreamLoopFunction · 0.90

Calls 7

toErrorFunction · 0.90
normalizeSseLineFunction · 0.85
createParseFailureFunction · 0.85
infoMethod · 0.80
parseMethod · 0.80
warnMethod · 0.65
readMethod · 0.45

Tested by

no test coverage detected