(
event: ExecutionEvent,
terminalStatus?: TerminalExecutionStreamStatus
)
| 1229 | |
| 1230 | let terminalEventPublished = false |
| 1231 | const sendEvent = async ( |
| 1232 | event: ExecutionEvent, |
| 1233 | terminalStatus?: TerminalExecutionStreamStatus |
| 1234 | ) => { |
| 1235 | const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done' |
| 1236 | let eventToSend = event |
| 1237 | if (isBuffered) { |
| 1238 | try { |
| 1239 | const entry = terminalStatus |
| 1240 | ? await eventWriter.writeTerminal(event, terminalStatus) |
| 1241 | : await eventWriter.write(event) |
| 1242 | eventToSend = entry.event |
| 1243 | eventToSend.eventId = entry.eventId |
| 1244 | terminalEventPublished ||= Boolean(terminalStatus) |
| 1245 | } catch (e) { |
| 1246 | // The event buffer (Redis replay store) rejected this event — e.g. the flush |
| 1247 | // batch exceeds the per-write byte cap for large block outputs. The buffer only |
| 1248 | // backs reconnect/replay; the live SSE stream is the primary delivery. Fall |
| 1249 | // through to enqueue the event live (below) instead of throwing, so terminal |
| 1250 | // events still reach the active client and the UI doesn't hang on "running". |
| 1251 | // Marking a terminal event delivered-live as published lets finalization close |
| 1252 | // the stream cleanly instead of aborting it with controller.error(). |
| 1253 | reqLogger.warn('Event buffer write failed; delivering event over live stream only', { |
| 1254 | eventType: event.type, |
| 1255 | terminal: Boolean(terminalStatus), |
| 1256 | error: toError(e).message, |
| 1257 | }) |
| 1258 | terminalEventPublished ||= Boolean(terminalStatus) |
| 1259 | } |
| 1260 | } |
| 1261 | if (!isStreamClosed) { |
| 1262 | try { |
| 1263 | controller.enqueue(encodeSSEEvent(eventToSend)) |
| 1264 | } catch { |
| 1265 | isStreamClosed = true |
| 1266 | } |
| 1267 | } |
| 1268 | } |
| 1269 | |
| 1270 | try { |
| 1271 | const startTime = new Date() |
no test coverage detected