MCPcopy Index your code
hub / github.com/simstudioai/sim / start

Function start

apps/sim/app/api/table/[tableId]/events/stream/route.ts:54–151  ·  view source on GitHub ↗
(controller)

Source from the content-addressed store, hash-verified

52
53 const stream = new ReadableStream<Uint8Array>({
54 async start(controller) {
55 let lastEventId = fromEventId
56 const deadline = Date.now() + MAX_STREAM_DURATION_MS
57 let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS
58
59 const enqueue = (text: string) => {
60 if (closed) return
61 try {
62 controller.enqueue(encoder.encode(text))
63 } catch {
64 closed = true
65 }
66 }
67
68 const sendEvents = (events: TableEventEntry[]) => {
69 for (const entry of events) {
70 if (closed) return
71 enqueue(`data: ${JSON.stringify(entry)}\n\n`)
72 lastEventId = entry.eventId
73 }
74 }
75
76 const sendPrunedAndClose = (earliestEventId: number | undefined) => {
77 enqueue(
78 `event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n`
79 )
80 if (!closed) {
81 closed = true
82 try {
83 controller.close()
84 } catch {}
85 }
86 }
87
88 const sendHeartbeat = () => {
89 // SSE comment line — keeps proxies (ALB default 60s idle) from closing
90 // the connection during quiet periods.
91 enqueue(`: ping ${Date.now()}\n\n`)
92 }
93
94 try {
95 // Initial replay from buffer.
96 const initial = await readTableEventsSince(tableId, lastEventId)
97 if (initial.status === 'pruned') {
98 sendPrunedAndClose(initial.earliestEventId)
99 return
100 }
101 if (initial.status === 'unavailable') {
102 throw new Error(`Table event buffer unavailable: ${initial.error}`)
103 }
104 sendEvents(initial.events)
105
106 // Stream loop — poll the buffer and forward new events. Workflow
107 // execution stream uses the same shape; pub/sub wakeups are an
108 // optimization we can add later if 500ms polling becomes a problem.
109 while (!closed && Date.now() < deadline) {
110 await sleep(POLL_INTERVAL_MS)
111 if (closed) return

Callers

nothing calls this directly

Calls 9

readTableEventsSinceFunction · 0.90
sleepFunction · 0.90
toErrorFunction · 0.90
sendPrunedAndCloseFunction · 0.85
sendEventsFunction · 0.85
sendHeartbeatFunction · 0.85
errorMethod · 0.80
enqueueFunction · 0.70
closeMethod · 0.65

Tested by

no test coverage detected