( tableId: string, rowId: string, ownerId: string, fn: () => Promise<T> )
| 34 | * doesn't prevent duplicate workflow executions. |
| 35 | */ |
| 36 | export async function withCascadeLock<T>( |
| 37 | tableId: string, |
| 38 | rowId: string, |
| 39 | ownerId: string, |
| 40 | fn: () => Promise<T> |
| 41 | ): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> { |
| 42 | const key = cascadeLockKey(tableId, rowId) |
| 43 | const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS) |
| 44 | if (!acquired) return { status: 'contended' } |
| 45 | |
| 46 | const heartbeat = setInterval(() => { |
| 47 | extendLock(key, ownerId, LOCK_TTL_SECONDS).catch((err) => { |
| 48 | logger.warn(`Heartbeat refresh failed for ${key}`, { error: toError(err).message }) |
| 49 | }) |
| 50 | }, HEARTBEAT_INTERVAL_MS) |
| 51 | |
| 52 | try { |
| 53 | const result = await fn() |
| 54 | return { status: 'acquired', result } |
| 55 | } finally { |
| 56 | clearInterval(heartbeat) |
| 57 | await releaseLock(key, ownerId).catch((err) => { |
| 58 | logger.warn(`Lock release failed for ${key}`, { error: toError(err).message }) |
| 59 | }) |
| 60 | } |
| 61 | } |
no test coverage detected