(controller)
| 52 | |
| 53 | const stream = new ReadableStream<Uint8Array>({ |
| 54 | async start(controller) { |
| 55 | let lastEventId = fromEventId |
| 56 | const deadline = Date.now() + MAX_STREAM_DURATION_MS |
| 57 | let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS |
| 58 | |
| 59 | const enqueue = (text: string) => { |
| 60 | if (closed) return |
| 61 | try { |
| 62 | controller.enqueue(encoder.encode(text)) |
| 63 | } catch { |
| 64 | closed = true |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | const sendEvents = (events: TableEventEntry[]) => { |
| 69 | for (const entry of events) { |
| 70 | if (closed) return |
| 71 | enqueue(`data: ${JSON.stringify(entry)}\n\n`) |
| 72 | lastEventId = entry.eventId |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | const sendPrunedAndClose = (earliestEventId: number | undefined) => { |
| 77 | enqueue( |
| 78 | `event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n` |
| 79 | ) |
| 80 | if (!closed) { |
| 81 | closed = true |
| 82 | try { |
| 83 | controller.close() |
| 84 | } catch {} |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | const sendHeartbeat = () => { |
| 89 | // SSE comment line — keeps proxies (ALB default 60s idle) from closing |
| 90 | // the connection during quiet periods. |
| 91 | enqueue(`: ping ${Date.now()}\n\n`) |
| 92 | } |
| 93 | |
| 94 | try { |
| 95 | // Initial replay from buffer. |
| 96 | const initial = await readTableEventsSince(tableId, lastEventId) |
| 97 | if (initial.status === 'pruned') { |
| 98 | sendPrunedAndClose(initial.earliestEventId) |
| 99 | return |
| 100 | } |
| 101 | if (initial.status === 'unavailable') { |
| 102 | throw new Error(`Table event buffer unavailable: ${initial.error}`) |
| 103 | } |
| 104 | sendEvents(initial.events) |
| 105 | |
| 106 | // Stream loop — poll the buffer and forward new events. Workflow |
| 107 | // execution stream uses the same shape; pub/sub wakeups are an |
| 108 | // optimization we can add later if 500ms polling becomes a problem. |
| 109 | while (!closed && Date.now() < deadline) { |
| 110 | await sleep(POLL_INTERVAL_MS) |
| 111 | if (closed) return |
nothing calls this directly
no test coverage detected