(lockId: AdvisoryLockId)
| 56 | * Use handle.release() to release the lock. |
| 57 | */ |
| 58 | export async function tryAcquireAdvisoryLock(lockId: AdvisoryLockId): Promise<{ |
| 59 | acquired: boolean |
| 60 | handle: LockHandle | null |
| 61 | }> { |
| 62 | logLock('info', 'Attempting to acquire advisory lock', { lockId }) |
| 63 | |
| 64 | const connection = postgres(env.DATABASE_URL, { |
| 65 | max: 1, |
| 66 | idle_timeout: 0, |
| 67 | connect_timeout: 10, |
| 68 | max_lifetime: 0, // Disable connection recycling - must keep session alive for advisory lock |
| 69 | }) |
| 70 | |
| 71 | try { |
| 72 | logLock('info', 'Database connection established, attempting pg_try_advisory_lock') |
| 73 | const result = await connection`SELECT pg_try_advisory_lock(${lockId}) as acquired` |
| 74 | const acquired = coerceBool(result[0]?.acquired) |
| 75 | |
| 76 | logLock('info', 'Lock acquisition result', { acquired, lockId }) |
| 77 | |
| 78 | if (!acquired) { |
| 79 | logLock('info', 'Lock not acquired (held by another process), closing connection') |
| 80 | await connection.end() |
| 81 | return { acquired: false, handle: null } |
| 82 | } |
| 83 | |
| 84 | logLock('info', 'Lock acquired successfully, setting up lock handle', { lockId }) |
| 85 | |
| 86 | // Create the lock handle |
| 87 | let lostCallback: (() => void) | null = null |
| 88 | let isReleased = false |
| 89 | let lostTriggered = false // Track if lost was triggered before callback registered |
| 90 | let healthCheckTimer: ReturnType<typeof setInterval> | null = null |
| 91 | let healthCheckCount = 0 |
| 92 | let healthCheckInFlight = false // Guard against stacking health checks |
| 93 | |
| 94 | const triggerLost = () => { |
| 95 | if (isReleased || lostTriggered) return |
| 96 | lostTriggered = true |
| 97 | logLock('warn', 'Lock lost detected, triggering lost callback', { lockId, healthCheckCount }) |
| 98 | if (healthCheckTimer) { |
| 99 | clearInterval(healthCheckTimer) |
| 100 | healthCheckTimer = null |
| 101 | } |
| 102 | // Close the connection before marking as released |
| 103 | connection.end().catch(() => {}) |
| 104 | isReleased = true |
| 105 | if (lostCallback) { |
| 106 | lostCallback() |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | // Start health check interval - verify we still hold the lock, not just connection liveness |
| 111 | healthCheckTimer = setInterval(async () => { |
| 112 | if (isReleased || healthCheckInFlight) return |
| 113 | healthCheckInFlight = true |
| 114 | healthCheckCount++ |
| 115 | try { |
no test coverage detected