MCPcopy Index your code
hub / github.com/simstudioai/sim / createExecutionEventWriter

Function createExecutionEventWriter

apps/sim/lib/execution/event-buffer.ts:720–1014  ·  view source on GitHub ↗
(
  executionId: string,
  context: ExecutionEventWriterContext = {}
)

Source from the content-addressed store, hash-verified

718}
719
720export function createExecutionEventWriter(
721 executionId: string,
722 context: ExecutionEventWriterContext = {}
723): ExecutionEventWriter {
724 const redis = getRedisClient()
725 if (!redis) {
726 if (canUseMemoryEventBuffer()) {
727 logger.info('createExecutionEventWriter: using in-memory event buffer', { executionId })
728 return createMemoryExecutionEventWriter(executionId, context)
729 }
730 logger.warn(
731 'createExecutionEventWriter: Redis client unavailable, events will not be buffered',
732 {
733 executionId,
734 }
735 )
736 return {
737 write: async (event) => ({ eventId: 0, executionId, event }),
738 writeTerminal: async () => {
739 throw new Error(`Execution event buffer unavailable for ${executionId}`)
740 },
741 flush: async () => {},
742 close: async () => {},
743 }
744 }
745
746 let pending: ExecutionEventEntry[] = []
747 let nextEventId = 0
748 let maxReservedId = 0
749 let flushTimer: ReturnType<typeof setTimeout> | null = null
750 let consecutiveFlushFailures = 0
751
752 const getFlushDelayMs = () => {
753 if (consecutiveFlushFailures === 0) return FLUSH_INTERVAL_MS
754 const backoff = Math.min(
755 FLUSH_INTERVAL_MS * 2 ** Math.min(consecutiveFlushFailures, 6),
756 FLUSH_MAX_RETRY_INTERVAL_MS
757 )
758 return backoff + randomInt(0, FLUSH_INTERVAL_MS)
759 }
760
761 const scheduleFlush = (delayMs = FLUSH_INTERVAL_MS) => {
762 if (flushTimer) return
763 flushTimer = setTimeout(() => {
764 flushTimer = null
765 void flushPending()
766 }, delayMs)
767 }
768
769 const reserveIds = async (minCount: number) => {
770 const reserveCount = Math.max(RESERVE_BATCH, minCount)
771 const newMax = await redis.incrby(getSeqKey(executionId), reserveCount)
772 const startId = newMax - reserveCount + 1
773 if (nextEventId === 0 || nextEventId > maxReservedId) {
774 nextEventId = startId
775 maxReservedId = newMax
776 }
777 }

Callers 4

runResumeExecutionMethod · 0.90
handleExecutePostFunction · 0.90

Calls 6

getRedisClientFunction · 0.90
canUseMemoryEventBufferFunction · 0.85
infoMethod · 0.80
warnMethod · 0.65
resolveMethod · 0.65

Tested by

no test coverage detected