* Reaper: move `processing` rows whose worker died (stale `lockedAt`) * back to `pending` so another worker can pick them up. Without this, * a SIGKILL between claim and result-write would permanently strand * the row in `processing`.
()
| 246 | * the row in `processing`. |
| 247 | */ |
| 248 | async function reapStuckProcessingRows(): Promise<number> { |
| 249 | const stuckBefore = new Date(Date.now() - STUCK_PROCESSING_THRESHOLD_MS) |
| 250 | const result = await db |
| 251 | .update(outboxEvent) |
| 252 | .set({ status: 'pending', lockedAt: null }) |
| 253 | .where(and(eq(outboxEvent.status, 'processing'), lte(outboxEvent.lockedAt, stuckBefore))) |
| 254 | .returning({ id: outboxEvent.id }) |
| 255 | |
| 256 | if (result.length > 0) { |
| 257 | logger.warn('Reaped stuck outbox processing rows', { |
| 258 | count: result.length, |
| 259 | thresholdMs: STUCK_PROCESSING_THRESHOLD_MS, |
| 260 | }) |
| 261 | } |
| 262 | return result.length |
| 263 | } |
| 264 | |
| 265 | /** |
| 266 | * Phase 1: claim a batch of due pending events. |
no test coverage detected