( reader: ReadableStreamDefaultReader<Uint8Array>, decoder: TextDecoder, abortSignal: AbortSignal | undefined, onEvent: (event: unknown) => boolean | undefined | Promise<boolean | undefined> )
| 20 | * @param onEvent Called per parsed event. Return true to stop processing. |
| 21 | */ |
| 22 | export async function processSSEStream( |
| 23 | reader: ReadableStreamDefaultReader<Uint8Array>, |
| 24 | decoder: TextDecoder, |
| 25 | abortSignal: AbortSignal | undefined, |
| 26 | onEvent: (event: unknown) => boolean | undefined | Promise<boolean | undefined> |
| 27 | ): Promise<void> { |
| 28 | let buffer = '' |
| 29 | |
| 30 | try { |
| 31 | try { |
| 32 | while (true) { |
| 33 | if (abortSignal?.aborted) { |
| 34 | logger.info('SSE stream aborted by signal') |
| 35 | break |
| 36 | } |
| 37 | |
| 38 | const { done, value } = await reader.read() |
| 39 | if (done) break |
| 40 | |
| 41 | buffer += decoder.decode(value, { stream: true }) |
| 42 | const lines = buffer.split('\n') |
| 43 | buffer = lines.pop() || '' |
| 44 | |
| 45 | let stopped = false |
| 46 | for (const line of lines) { |
| 47 | const normalizedLine = normalizeSseLine(line) |
| 48 | if (abortSignal?.aborted) { |
| 49 | logger.info('SSE stream aborted mid-chunk (between events)') |
| 50 | return |
| 51 | } |
| 52 | if (!normalizedLine.trim()) continue |
| 53 | if (!normalizedLine.startsWith('data: ')) continue |
| 54 | |
| 55 | const jsonStr = normalizedLine.slice(6) |
| 56 | if (jsonStr === '[DONE]') continue |
| 57 | |
| 58 | let parsed: unknown |
| 59 | try { |
| 60 | parsed = JSON.parse(jsonStr) |
| 61 | } catch (error) { |
| 62 | const preview = jsonStr.slice(0, 200) |
| 63 | const detail = toError(error).message |
| 64 | throw createParseFailure(`Failed to parse SSE event JSON: ${detail}`, preview) |
| 65 | } |
| 66 | |
| 67 | try { |
| 68 | if (await onEvent(parsed)) { |
| 69 | stopped = true |
| 70 | break |
| 71 | } |
| 72 | } catch (error) { |
| 73 | if (error instanceof FatalSseEventError) { |
| 74 | throw error |
| 75 | } |
| 76 | logger.warn('Failed to handle SSE event', { |
| 77 | preview: jsonStr.slice(0, 200), |
| 78 | error: toError(error).message, |
| 79 | }) |
no test coverage detected