(controller)
| 1222 | |
| 1223 | const stream = new ReadableStream<Uint8Array>({ |
| 1224 | async start(controller) { |
| 1225 | let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null |
| 1226 | |
| 1227 | registerManualExecutionAborter(executionId, timeoutController.abort) |
| 1228 | isManualAbortRegistered = true |
| 1229 | |
| 1230 | let terminalEventPublished = false |
| 1231 | const sendEvent = async ( |
| 1232 | event: ExecutionEvent, |
| 1233 | terminalStatus?: TerminalExecutionStreamStatus |
| 1234 | ) => { |
| 1235 | const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done' |
| 1236 | let eventToSend = event |
| 1237 | if (isBuffered) { |
| 1238 | try { |
| 1239 | const entry = terminalStatus |
| 1240 | ? await eventWriter.writeTerminal(event, terminalStatus) |
| 1241 | : await eventWriter.write(event) |
| 1242 | eventToSend = entry.event |
| 1243 | eventToSend.eventId = entry.eventId |
| 1244 | terminalEventPublished ||= Boolean(terminalStatus) |
| 1245 | } catch (e) { |
| 1246 | // The event buffer (Redis replay store) rejected this event — e.g. the flush |
| 1247 | // batch exceeds the per-write byte cap for large block outputs. The buffer only |
| 1248 | // backs reconnect/replay; the live SSE stream is the primary delivery. Fall |
| 1249 | // through to enqueue the event live (below) instead of throwing, so terminal |
| 1250 | // events still reach the active client and the UI doesn't hang on "running". |
| 1251 | // Marking a terminal event delivered-live as published lets finalization close |
| 1252 | // the stream cleanly instead of aborting it with controller.error(). |
| 1253 | reqLogger.warn('Event buffer write failed; delivering event over live stream only', { |
| 1254 | eventType: event.type, |
| 1255 | terminal: Boolean(terminalStatus), |
| 1256 | error: toError(e).message, |
| 1257 | }) |
| 1258 | terminalEventPublished ||= Boolean(terminalStatus) |
| 1259 | } |
| 1260 | } |
| 1261 | if (!isStreamClosed) { |
| 1262 | try { |
| 1263 | controller.enqueue(encodeSSEEvent(eventToSend)) |
| 1264 | } catch { |
| 1265 | isStreamClosed = true |
| 1266 | } |
| 1267 | } |
| 1268 | } |
| 1269 | |
| 1270 | try { |
| 1271 | const startTime = new Date() |
| 1272 | |
| 1273 | await sendEvent({ |
| 1274 | type: 'execution:started', |
| 1275 | timestamp: startTime.toISOString(), |
| 1276 | executionId, |
| 1277 | workflowId, |
| 1278 | data: { |
| 1279 | startTime: startTime.toISOString(), |
| 1280 | }, |
| 1281 | }) |
nothing calls this directly
no test coverage detected