(
handlers: OutboxHandlerRegistry,
options: { batchSize?: number; maxRuntimeMs?: number; minRemainingMs?: number } = {}
)
| 163 | * multiple workers — `SELECT FOR UPDATE SKIP LOCKED` serializes claims. |
| 164 | */ |
| 165 | export async function processOutboxEvents( |
| 166 | handlers: OutboxHandlerRegistry, |
| 167 | options: { batchSize?: number; maxRuntimeMs?: number; minRemainingMs?: number } = {} |
| 168 | ): Promise<ProcessOutboxResult> { |
| 169 | const batchSize = options.batchSize ?? 10 |
| 170 | const deadline = options.maxRuntimeMs ? Date.now() + options.maxRuntimeMs : undefined |
| 171 | const minRemainingMs = options.minRemainingMs ?? DEFAULT_HANDLER_TIMEOUT_MS + 5000 |
| 172 | |
| 173 | const reaped = await reapStuckProcessingRows() |
| 174 | |
| 175 | let processed = 0 |
| 176 | let retried = 0 |
| 177 | let deadLettered = 0 |
| 178 | let leaseLost = 0 |
| 179 | |
| 180 | for (let i = 0; i < batchSize; i++) { |
| 181 | if (deadline && Date.now() + minRemainingMs > deadline) break |
| 182 | |
| 183 | const [event] = await claimBatch(1) |
| 184 | if (!event) break |
| 185 | |
| 186 | const result = await runHandler(event, handlers) |
| 187 | if (result === 'completed') processed++ |
| 188 | else if (result === 'dead_letter') deadLettered++ |
| 189 | else if (result === 'lease_lost') leaseLost++ |
| 190 | else retried++ |
| 191 | } |
| 192 | |
| 193 | return { processed, retried, deadLettered, leaseLost, reaped } |
| 194 | } |
| 195 | |
| 196 | /** |
| 197 | * Process a specific outbox event immediately after its surrounding |
no test coverage detected