(controller: ReadableStreamDefaultController)
| 267 | }) |
| 268 | |
| 269 | async function startInner(controller: ReadableStreamDefaultController) { |
| 270 | let cursor = afterCursor || '0' |
| 271 | let controllerClosed = false |
| 272 | let sawTerminalEvent = false |
| 273 | let currentRequestId = extractRunRequestId(run) |
| 274 | let lastWriteTime = Date.now() |
| 275 | // Stamp the logical request id + chat id on the resume root as soon |
| 276 | // as we resolve them from the run row, so TraceQL joins work on |
| 277 | // resume legs the same way they do on the original POST. |
| 278 | if (currentRequestId) { |
| 279 | rootSpan.setAttribute(TraceAttr.RequestId, currentRequestId) |
| 280 | rootSpan.setAttribute(TraceAttr.SimRequestId, currentRequestId) |
| 281 | } |
| 282 | if (run?.chatId) { |
| 283 | rootSpan.setAttribute(TraceAttr.ChatId, run.chatId) |
| 284 | } |
| 285 | |
| 286 | const closeController = () => { |
| 287 | if (controllerClosed) return |
| 288 | controllerClosed = true |
| 289 | try { |
| 290 | controller.close() |
| 291 | } catch { |
| 292 | // Controller already closed by runtime/client |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | const enqueueEvent = (payload: unknown) => { |
| 297 | if (controllerClosed) return false |
| 298 | try { |
| 299 | controller.enqueue(encodeSSEEnvelope(payload)) |
| 300 | lastWriteTime = Date.now() |
| 301 | return true |
| 302 | } catch { |
| 303 | controllerClosed = true |
| 304 | return false |
| 305 | } |
| 306 | } |
| 307 | |
| 308 | const enqueueComment = (comment: string) => { |
| 309 | if (controllerClosed) return false |
| 310 | try { |
| 311 | controller.enqueue(encodeSSEComment(comment)) |
| 312 | lastWriteTime = Date.now() |
| 313 | return true |
| 314 | } catch { |
| 315 | controllerClosed = true |
| 316 | return false |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | const abortListener = () => { |
| 321 | controllerClosed = true |
| 322 | } |
| 323 | request.signal.addEventListener('abort', abortListener, { once: true }) |
| 324 | |
| 325 | const flushEvents = async () => { |
| 326 | const events = await readEvents(streamId, cursor) |
no test coverage detected