(source: SSESource, options: ReadSSELinesOptions)
| 128 | * @param options - The `onData` callback plus an optional `signal`. |
| 129 | */ |
| 130 | export async function readSSELines(source: SSESource, options: ReadSSELinesOptions): Promise<void> { |
| 131 | const { onData, signal } = options |
| 132 | const { reader, ownsLock } = toReader(source) |
| 133 | const decoder = new TextDecoder() |
| 134 | let buffer = '' |
| 135 | |
| 136 | try { |
| 137 | while (true) { |
| 138 | if (signal?.aborted) break |
| 139 | |
| 140 | const { done, value } = await reader.read() |
| 141 | |
| 142 | buffer += done ? decoder.decode() : decoder.decode(value, { stream: true }) |
| 143 | const lines = buffer.split('\n') |
| 144 | buffer = done ? '' : (lines.pop() ?? '') |
| 145 | |
| 146 | for (const rawLine of lines) { |
| 147 | if (signal?.aborted) return |
| 148 | |
| 149 | const line = stripCarriageReturn(rawLine) |
| 150 | if (!line.startsWith('data:')) continue |
| 151 | |
| 152 | let data = line.slice(5) |
| 153 | if (data.startsWith(' ')) data = data.slice(1) |
| 154 | if (data === DONE_SENTINEL) continue |
| 155 | |
| 156 | if ((await onData(data)) === true) return |
| 157 | } |
| 158 | |
| 159 | if (done) break |
| 160 | } |
| 161 | } finally { |
| 162 | if (ownsLock) reader.releaseLock() |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | /** |
| 167 | * The JSON convenience layer over {@link readSSELines}: invokes `onEvent` once |
no test coverage detected