(
event: ExecutionEvent,
status: TerminalExecutionStreamStatus
)
| 938 | } |
| 939 | |
| 940 | const writeTerminal = ( |
| 941 | event: ExecutionEvent, |
| 942 | status: TerminalExecutionStreamStatus |
| 943 | ): Promise<ExecutionEventEntry> => { |
| 944 | if (closed) return Promise.resolve({ eventId: 0, executionId, event }) |
| 945 | const p = writeQueue.then(async () => { |
| 946 | if (flushTimer) { |
| 947 | clearTimeout(flushTimer) |
| 948 | flushTimer = null |
| 949 | } |
| 950 | if (nextEventId === 0 || nextEventId > maxReservedId) { |
| 951 | await reserveIds(1) |
| 952 | } |
| 953 | const eventId = nextEventId++ |
| 954 | const compactEvent = await compactEventForBuffer(event, { |
| 955 | ...context, |
| 956 | executionId, |
| 957 | requireDurablePayloads: true, |
| 958 | }) |
| 959 | const entry: ExecutionEventEntry = { eventId, executionId, event: compactEvent } |
| 960 | pending.push(entry) |
| 961 | const ok = await flushPending(false, status) |
| 962 | if (!ok) { |
| 963 | pending = pending.filter((pendingEntry) => pendingEntry !== entry) |
| 964 | throw new Error(`Failed to flush terminal execution event for ${executionId}`) |
| 965 | } |
| 966 | closed = true |
| 967 | return entry |
| 968 | }) |
| 969 | writeQueue = p.then( |
| 970 | () => { |
| 971 | writeFailure = null |
| 972 | }, |
| 973 | (error) => { |
| 974 | writeFailure = toError(error) |
| 975 | } |
| 976 | ) |
| 977 | inflightWrites.add(p) |
| 978 | const remove = () => inflightWrites.delete(p) |
| 979 | p.then(remove, remove) |
| 980 | return p |
| 981 | } |
| 982 | |
| 983 | const close = async () => { |
| 984 | closed = true |
nothing calls this directly
no test coverage detected