* Phase 1: claim a batch of due pending events. * * `SELECT ... FOR UPDATE SKIP LOCKED` atomically picks rows that no * other worker is currently looking at. We then flip those rows to * `processing` inside the same tx so the claim survives the lock * release — the status change becomes the out
(batchSize: number)
| 271 | * release — the status change becomes the out-of-band mutual exclusion. |
| 272 | */ |
| 273 | async function claimBatch(batchSize: number): Promise<(typeof outboxEvent.$inferSelect)[]> { |
| 274 | const now = new Date() |
| 275 | return db.transaction(async (tx) => { |
| 276 | const rows = await tx |
| 277 | .select() |
| 278 | .from(outboxEvent) |
| 279 | .where(and(eq(outboxEvent.status, 'pending'), lte(outboxEvent.availableAt, now))) |
| 280 | .orderBy(asc(outboxEvent.createdAt)) |
| 281 | .limit(batchSize) |
| 282 | .for('update', { skipLocked: true }) |
| 283 | |
| 284 | if (rows.length === 0) return [] |
| 285 | |
| 286 | await tx |
| 287 | .update(outboxEvent) |
| 288 | .set({ status: 'processing', lockedAt: now }) |
| 289 | .where( |
| 290 | inArray( |
| 291 | outboxEvent.id, |
| 292 | rows.map((r) => r.id) |
| 293 | ) |
| 294 | ) |
| 295 | |
| 296 | // Return rows with the claim state we just committed. `lockedAt` |
| 297 | // on this object is the authoritative lease timestamp used by the |
| 298 | // terminal-update lease CAS (see `runHandler`). |
| 299 | return rows.map((row) => ({ |
| 300 | ...row, |
| 301 | status: 'processing' as const, |
| 302 | lockedAt: now, |
| 303 | })) |
| 304 | }) |
| 305 | } |
| 306 | |
| 307 | /** |
| 308 | * Phase 2: invoke the handler for a claimed event, outside any DB |
no test coverage detected