(
executionId: string,
context: ExecutionEventWriterContext = {}
)
| 718 | } |
| 719 | |
| 720 | export function createExecutionEventWriter( |
| 721 | executionId: string, |
| 722 | context: ExecutionEventWriterContext = {} |
| 723 | ): ExecutionEventWriter { |
| 724 | const redis = getRedisClient() |
| 725 | if (!redis) { |
| 726 | if (canUseMemoryEventBuffer()) { |
| 727 | logger.info('createExecutionEventWriter: using in-memory event buffer', { executionId }) |
| 728 | return createMemoryExecutionEventWriter(executionId, context) |
| 729 | } |
| 730 | logger.warn( |
| 731 | 'createExecutionEventWriter: Redis client unavailable, events will not be buffered', |
| 732 | { |
| 733 | executionId, |
| 734 | } |
| 735 | ) |
| 736 | return { |
| 737 | write: async (event) => ({ eventId: 0, executionId, event }), |
| 738 | writeTerminal: async () => { |
| 739 | throw new Error(`Execution event buffer unavailable for ${executionId}`) |
| 740 | }, |
| 741 | flush: async () => {}, |
| 742 | close: async () => {}, |
| 743 | } |
| 744 | } |
| 745 | |
| 746 | let pending: ExecutionEventEntry[] = [] |
| 747 | let nextEventId = 0 |
| 748 | let maxReservedId = 0 |
| 749 | let flushTimer: ReturnType<typeof setTimeout> | null = null |
| 750 | let consecutiveFlushFailures = 0 |
| 751 | |
| 752 | const getFlushDelayMs = () => { |
| 753 | if (consecutiveFlushFailures === 0) return FLUSH_INTERVAL_MS |
| 754 | const backoff = Math.min( |
| 755 | FLUSH_INTERVAL_MS * 2 ** Math.min(consecutiveFlushFailures, 6), |
| 756 | FLUSH_MAX_RETRY_INTERVAL_MS |
| 757 | ) |
| 758 | return backoff + randomInt(0, FLUSH_INTERVAL_MS) |
| 759 | } |
| 760 | |
| 761 | const scheduleFlush = (delayMs = FLUSH_INTERVAL_MS) => { |
| 762 | if (flushTimer) return |
| 763 | flushTimer = setTimeout(() => { |
| 764 | flushTimer = null |
| 765 | void flushPending() |
| 766 | }, delayMs) |
| 767 | } |
| 768 | |
| 769 | const reserveIds = async (minCount: number) => { |
| 770 | const reserveCount = Math.max(RESERVE_BATCH, minCount) |
| 771 | const newMax = await redis.incrby(getSeqKey(executionId), reserveCount) |
| 772 | const startId = newMax - reserveCount + 1 |
| 773 | if (nextEventId === 0 || nextEventId > maxReservedId) { |
| 774 | nextEventId = startId |
| 775 | maxReservedId = newMax |
| 776 | } |
| 777 | } |
no test coverage detected