( eventId: string, handlers: OutboxHandlerRegistry )
| 199 | * `FOR UPDATE SKIP LOCKED`, and non-pending rows are left alone. |
| 200 | */ |
| 201 | export async function processOutboxEventById( |
| 202 | eventId: string, |
| 203 | handlers: OutboxHandlerRegistry |
| 204 | ): Promise<ProcessSingleOutboxResult> { |
| 205 | const now = new Date() |
| 206 | const event = await db.transaction(async (tx) => { |
| 207 | const [row] = await tx |
| 208 | .select() |
| 209 | .from(outboxEvent) |
| 210 | .where(eq(outboxEvent.id, eventId)) |
| 211 | .limit(1) |
| 212 | .for('update', { skipLocked: true }) |
| 213 | |
| 214 | if (!row) return null |
| 215 | if (row.status !== 'pending') return row.status as ProcessSingleOutboxResult |
| 216 | if (row.availableAt > now) return 'pending' as const |
| 217 | |
| 218 | await tx |
| 219 | .update(outboxEvent) |
| 220 | .set({ status: 'processing', lockedAt: now }) |
| 221 | .where(eq(outboxEvent.id, eventId)) |
| 222 | |
| 223 | return { |
| 224 | ...row, |
| 225 | status: 'processing' as const, |
| 226 | lockedAt: now, |
| 227 | } |
| 228 | }) |
| 229 | |
| 230 | if (!event) { |
| 231 | const [current] = await db |
| 232 | .select({ status: outboxEvent.status }) |
| 233 | .from(outboxEvent) |
| 234 | .where(eq(outboxEvent.id, eventId)) |
| 235 | .limit(1) |
| 236 | return current ? (current.status as ProcessSingleOutboxResult) : 'not_found' |
| 237 | } |
| 238 | if (typeof event === 'string') return event |
| 239 | return runHandler(event, handlers) |
| 240 | } |
| 241 | |
| 242 | /** |
| 243 | * Reaper: move `processing` rows whose worker died (stale `lockedAt`) |
no test coverage detected