(
body: ReadableStream<Uint8Array>,
options: ReadSSEStreamOptions = {}
)
| 214 | * @returns The accumulated content from the stream |
| 215 | */ |
| 216 | export async function readSSEStream( |
| 217 | body: ReadableStream<Uint8Array>, |
| 218 | options: ReadSSEStreamOptions = {} |
| 219 | ): Promise<string> { |
| 220 | const { onChunk, onAccumulated, signal } = options |
| 221 | const reader = body.getReader() |
| 222 | const decoder = new TextDecoder() |
| 223 | let accumulatedContent = '' |
| 224 | let buffer = '' |
| 225 | |
| 226 | try { |
| 227 | while (true) { |
| 228 | if (signal?.aborted) { |
| 229 | break |
| 230 | } |
| 231 | |
| 232 | const { done, value } = await reader.read() |
| 233 | |
| 234 | if (done) { |
| 235 | const remaining = decoder.decode() |
| 236 | if (remaining) { |
| 237 | buffer += remaining |
| 238 | } |
| 239 | break |
| 240 | } |
| 241 | |
| 242 | buffer += decoder.decode(value, { stream: true }) |
| 243 | const lines = buffer.split('\n\n') |
| 244 | buffer = lines.pop() || '' |
| 245 | |
| 246 | for (const line of lines) { |
| 247 | if (line.startsWith('data: ')) { |
| 248 | const lineData = line.substring(6) |
| 249 | if (lineData === '[DONE]') continue |
| 250 | |
| 251 | try { |
| 252 | const data = JSON.parse(lineData) |
| 253 | if (data.error) throw new Error(data.error) |
| 254 | if (data.chunk) { |
| 255 | accumulatedContent += data.chunk |
| 256 | onChunk?.(data.chunk) |
| 257 | onAccumulated?.(accumulatedContent) |
| 258 | } |
| 259 | if (data.done) break |
| 260 | } catch { |
| 261 | // Skip unparseable lines |
| 262 | } |
| 263 | } |
| 264 | } |
| 265 | } |
| 266 | } finally { |
| 267 | reader.releaseLock() |
| 268 | } |
| 269 | |
| 270 | return accumulatedContent |
| 271 | } |
no test coverage detected