MCPcopy
hub / github.com/simstudioai/sim / claimBatch

Function claimBatch

apps/sim/lib/core/outbox/service.ts:273–305  ·  view source on GitHub ↗

* Phase 1: claim a batch of due pending events. * * `SELECT ... FOR UPDATE SKIP LOCKED` atomically picks rows that no * other worker is currently looking at. We then flip those rows to * `processing` inside the same tx so the claim survives the lock * release — the status change becomes the out

(batchSize: number)

Source from the content-addressed store, hash-verified

271 * release — the status change becomes the out-of-band mutual exclusion.
272 */
273async function claimBatch(batchSize: number): Promise<(typeof outboxEvent.$inferSelect)[]> {
274 const now = new Date()
275 return db.transaction(async (tx) => {
276 const rows = await tx
277 .select()
278 .from(outboxEvent)
279 .where(and(eq(outboxEvent.status, 'pending'), lte(outboxEvent.availableAt, now)))
280 .orderBy(asc(outboxEvent.createdAt))
281 .limit(batchSize)
282 .for('update', { skipLocked: true })
283
284 if (rows.length === 0) return []
285
286 await tx
287 .update(outboxEvent)
288 .set({ status: 'processing', lockedAt: now })
289 .where(
290 inArray(
291 outboxEvent.id,
292 rows.map((r) => r.id)
293 )
294 )
295
296 // Return rows with the claim state we just committed. `lockedAt`
297 // on this object is the authoritative lease timestamp used by the
298 // terminal-update lease CAS (see `runHandler`).
299 return rows.map((row) => ({
300 ...row,
301 status: 'processing' as const,
302 lockedAt: now,
303 }))
304 })
305}
306
307/**
308 * Phase 2: invoke the handler for a claimed event, outside any DB

Callers 1

processOutboxEventsFunction · 0.85

Calls 2

setMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected