* Phase 2: invoke the handler for a claimed event, outside any DB * transaction, then transition the row to its terminal or retry state. * * Every terminal UPDATE is guarded by a lease CAS (`WHERE status = * 'processing' AND locked_at = event.lockedAt`). This defends against * the "slow handler
( event: typeof outboxEvent.$inferSelect, handlers: OutboxHandlerRegistry )
| 317 | * rowCount is 0 — and we log+skip instead of clobbering the new lease. |
| 318 | */ |
| 319 | async function runHandler( |
| 320 | event: typeof outboxEvent.$inferSelect, |
| 321 | handlers: OutboxHandlerRegistry |
| 322 | ): Promise<'completed' | 'pending' | 'dead_letter' | 'lease_lost'> { |
| 323 | const handler = handlers[event.eventType] |
| 324 | |
| 325 | if (!handler) { |
| 326 | logger.error('No handler registered for outbox event type', { |
| 327 | eventId: event.id, |
| 328 | eventType: event.eventType, |
| 329 | }) |
| 330 | await updateIfLeaseHeld(event, { |
| 331 | status: 'dead_letter', |
| 332 | lastError: `No handler registered for event type '${event.eventType}'`, |
| 333 | processedAt: new Date(), |
| 334 | lockedAt: null, |
| 335 | }) |
| 336 | return 'dead_letter' |
| 337 | } |
| 338 | |
| 339 | try { |
| 340 | await runHandlerWithTimeout(handler, event) |
| 341 | const updated = await updateIfLeaseHeld(event, { |
| 342 | status: 'completed', |
| 343 | processedAt: new Date(), |
| 344 | lockedAt: null, |
| 345 | }) |
| 346 | if (!updated) { |
| 347 | logger.warn('Outbox event completion skipped — lease lost (reaped + reclaimed)', { |
| 348 | eventId: event.id, |
| 349 | eventType: event.eventType, |
| 350 | }) |
| 351 | return 'lease_lost' |
| 352 | } |
| 353 | logger.info('Outbox event processed', { |
| 354 | eventId: event.id, |
| 355 | eventType: event.eventType, |
| 356 | attempts: event.attempts + 1, |
| 357 | }) |
| 358 | return 'completed' |
| 359 | } catch (error) { |
| 360 | if (error instanceof OutboxHandlerTimeoutError) { |
| 361 | return recordTimedOutAttempt(event, error.message) |
| 362 | } |
| 363 | |
| 364 | const nextAttempts = event.attempts + 1 |
| 365 | const isDead = nextAttempts >= event.maxAttempts |
| 366 | const errMsg = toError(error).message |
| 367 | |
| 368 | if (isDead) { |
| 369 | const updated = await updateIfLeaseHeld(event, { |
| 370 | attempts: nextAttempts, |
| 371 | status: 'dead_letter', |
| 372 | lastError: errMsg, |
| 373 | processedAt: new Date(), |
| 374 | lockedAt: null, |
| 375 | }) |
| 376 | if (!updated) { |
no test coverage detected