MCPcopy
hub / github.com/simstudioai/sim / processOutboxEvents

Function processOutboxEvents

apps/sim/lib/core/outbox/service.ts:165–194  ·  view source on GitHub ↗
(
  handlers: OutboxHandlerRegistry,
  options: { batchSize?: number; maxRuntimeMs?: number; minRemainingMs?: number } = {}
)

Source from the content-addressed store, hash-verified

163 * multiple workers — `SELECT FOR UPDATE SKIP LOCKED` serializes claims.
164 */
165export async function processOutboxEvents(
166 handlers: OutboxHandlerRegistry,
167 options: { batchSize?: number; maxRuntimeMs?: number; minRemainingMs?: number } = {}
168): Promise<ProcessOutboxResult> {
169 const batchSize = options.batchSize ?? 10
170 const deadline = options.maxRuntimeMs ? Date.now() + options.maxRuntimeMs : undefined
171 const minRemainingMs = options.minRemainingMs ?? DEFAULT_HANDLER_TIMEOUT_MS + 5000
172
173 const reaped = await reapStuckProcessingRows()
174
175 let processed = 0
176 let retried = 0
177 let deadLettered = 0
178 let leaseLost = 0
179
180 for (let i = 0; i < batchSize; i++) {
181 if (deadline && Date.now() + minRemainingMs > deadline) break
182
183 const [event] = await claimBatch(1)
184 if (!event) break
185
186 const result = await runHandler(event, handlers)
187 if (result === 'completed') processed++
188 else if (result === 'dead_letter') deadLettered++
189 else if (result === 'lease_lost') leaseLost++
190 else retried++
191 }
192
193 return { processed, retried, deadLettered, leaseLost, reaped }
194}
195
196/**
197 * Process a specific outbox event immediately after its surrounding

Callers 2

service.test.tsFile · 0.90
route.tsFile · 0.90

Calls 3

reapStuckProcessingRowsFunction · 0.85
claimBatchFunction · 0.85
runHandlerFunction · 0.85

Tested by

no test coverage detected