MCPcopy Index your code
hub / github.com/simstudioai/sim / readSSEStream

Function readSSEStream

apps/sim/lib/core/utils/sse.ts:216–271  ·  view source on GitHub ↗
(
  body: ReadableStream<Uint8Array>,
  options: ReadSSEStreamOptions = {}
)

Source from the content-addressed store, hash-verified

214 * @returns The accumulated content from the stream
215 */
216export async function readSSEStream(
217 body: ReadableStream<Uint8Array>,
218 options: ReadSSEStreamOptions = {}
219): Promise<string> {
220 const { onChunk, onAccumulated, signal } = options
221 const reader = body.getReader()
222 const decoder = new TextDecoder()
223 let accumulatedContent = ''
224 let buffer = ''
225
226 try {
227 while (true) {
228 if (signal?.aborted) {
229 break
230 }
231
232 const { done, value } = await reader.read()
233
234 if (done) {
235 const remaining = decoder.decode()
236 if (remaining) {
237 buffer += remaining
238 }
239 break
240 }
241
242 buffer += decoder.decode(value, { stream: true })
243 const lines = buffer.split('\n\n')
244 buffer = lines.pop() || ''
245
246 for (const line of lines) {
247 if (line.startsWith('data: ')) {
248 const lineData = line.substring(6)
249 if (lineData === '[DONE]') continue
250
251 try {
252 const data = JSON.parse(lineData)
253 if (data.error) throw new Error(data.error)
254 if (data.chunk) {
255 accumulatedContent += data.chunk
256 onChunk?.(data.chunk)
257 onAccumulated?.(accumulatedContent)
258 }
259 if (data.done) break
260 } catch {
261 // Skip unparseable lines
262 }
263 }
264 }
265 }
266 } finally {
267 reader.releaseLock()
268 }
269
270 return accumulatedContent
271}

Callers 4

streaming.test.tsFile · 0.90
sse.test.tsFile · 0.90
useWandFunction · 0.90

Calls 2

parseMethod · 0.80
readMethod · 0.45

Tested by

no test coverage detected