MCPcopy Index your code
hub / github.com/simstudioai/sim / processOutboxEventById

Function processOutboxEventById

apps/sim/lib/core/outbox/service.ts:201–240  ·  view source on GitHub ↗
(
  eventId: string,
  handlers: OutboxHandlerRegistry
)

Source from the content-addressed store, hash-verified

199 * `FOR UPDATE SKIP LOCKED`, and non-pending rows are left alone.
200 */
201export async function processOutboxEventById(
202 eventId: string,
203 handlers: OutboxHandlerRegistry
204): Promise<ProcessSingleOutboxResult> {
205 const now = new Date()
206 const event = await db.transaction(async (tx) => {
207 const [row] = await tx
208 .select()
209 .from(outboxEvent)
210 .where(eq(outboxEvent.id, eventId))
211 .limit(1)
212 .for('update', { skipLocked: true })
213
214 if (!row) return null
215 if (row.status !== 'pending') return row.status as ProcessSingleOutboxResult
216 if (row.availableAt > now) return 'pending' as const
217
218 await tx
219 .update(outboxEvent)
220 .set({ status: 'processing', lockedAt: now })
221 .where(eq(outboxEvent.id, eventId))
222
223 return {
224 ...row,
225 status: 'processing' as const,
226 lockedAt: now,
227 }
228 })
229
230 if (!event) {
231 const [current] = await db
232 .select({ status: outboxEvent.status })
233 .from(outboxEvent)
234 .where(eq(outboxEvent.id, eventId))
235 .limit(1)
236 return current ? (current.status as ProcessSingleOutboxResult) : 'not_found'
237 }
238 if (typeof event === 'string') return event
239 return runHandler(event, handlers)
240}
241
242/**
243 * Reaper: move `processing` rows whose worker died (stale `lockedAt`)

Callers 1

Calls 3

runHandlerFunction · 0.85
setMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected