* Open the run's WebSocket tail (the coordinator's `fetch` returns a `101` with a * `webSocket`) and yield each chat `StreamChunk` as it arrives. Resolves when the * coordinator sends its terminal `status` frame (or the socket closes / the client * disconnects).
( coordinator: Coordinator, runId: string, threadId: string, signal: AbortSignal, )
| 176 | * disconnects). |
| 177 | */ |
| 178 | async function* tailRun( |
| 179 | coordinator: Coordinator, |
| 180 | runId: string, |
| 181 | threadId: string, |
| 182 | signal: AbortSignal, |
| 183 | ): AsyncGenerator<StreamChunk> { |
| 184 | // The host is irrelevant — the DO routes on the pathname; this is an in-process |
| 185 | // DO `fetch`, not a public request. |
| 186 | const streamUrl = `https://do/runs/${runId}/stream?threadId=${encodeURIComponent(threadId)}&lastSeq=-1` |
| 187 | const res = await coordinator.fetch(streamUrl, { |
| 188 | headers: { Upgrade: 'websocket' }, |
| 189 | }) |
| 190 | const socket = res.webSocket |
| 191 | if (!socket) { |
| 192 | throw new Error( |
| 193 | `agent stream did not upgrade to a WebSocket (status ${res.status})`, |
| 194 | ) |
| 195 | } |
| 196 | socket.accept() |
| 197 | |
| 198 | const queue: Array<StreamChunk> = [] |
| 199 | // Mutated from the socket/abort callbacks below. Held on an object (rather than |
| 200 | // bare `let`s) so the generator loop's checks aren't flagged as constant. |
| 201 | const state: { finished: boolean; failure: Error | null } = { |
| 202 | finished: false, |
| 203 | failure: null, |
| 204 | } |
| 205 | let wake: (() => void) | null = null |
| 206 | const signalReady = () => { |
| 207 | wake?.() |
| 208 | wake = null |
| 209 | } |
| 210 | |
| 211 | socket.addEventListener('message', (event: MessageEvent) => { |
| 212 | let parsed: unknown |
| 213 | try { |
| 214 | parsed = JSON.parse(typeof event.data === 'string' ? event.data : '') |
| 215 | } catch { |
| 216 | return |
| 217 | } |
| 218 | if (parsed === null || typeof parsed !== 'object') return |
| 219 | if ('type' in parsed && parsed.type === 'status') { |
| 220 | state.finished = true |
| 221 | } else if ('chunk' in parsed) { |
| 222 | queue.push(parsed.chunk as StreamChunk) |
| 223 | } |
| 224 | signalReady() |
| 225 | }) |
| 226 | socket.addEventListener('close', () => { |
| 227 | state.finished = true |
| 228 | signalReady() |
| 229 | }) |
| 230 | socket.addEventListener('error', () => { |
| 231 | state.failure = new Error('agent stream socket error') |
| 232 | state.finished = true |
| 233 | signalReady() |
| 234 | }) |
| 235 | const onAbort = () => { |
no test coverage detected