(opts: LeaderLockOptions<T>)
| 20 | } |
| 21 | |
| 22 | export async function withLeaderLock<T>(opts: LeaderLockOptions<T>): Promise<T | null> { |
| 23 | const { |
| 24 | key, |
| 25 | ttlSec = DEFAULT_TTL_SEC, |
| 26 | pollIntervalMs = DEFAULT_POLL_INTERVAL_MS, |
| 27 | maxWaitMs = DEFAULT_MAX_WAIT_MS, |
| 28 | onLeader, |
| 29 | onFollower, |
| 30 | } = opts |
| 31 | |
| 32 | const ownerToken = generateShortId() |
| 33 | |
| 34 | let acquired = false |
| 35 | try { |
| 36 | acquired = await acquireLock(key, ownerToken, ttlSec) |
| 37 | } catch (error) { |
| 38 | logger.warn('Lock acquisition failed; running leader path uncoordinated', { |
| 39 | key, |
| 40 | error: toError(error).message, |
| 41 | }) |
| 42 | return onLeader() |
| 43 | } |
| 44 | |
| 45 | if (acquired) { |
| 46 | try { |
| 47 | return await onLeader() |
| 48 | } finally { |
| 49 | try { |
| 50 | await releaseLock(key, ownerToken) |
| 51 | } catch (error) { |
| 52 | logger.warn('Lock release failed (will expire via TTL)', { |
| 53 | key, |
| 54 | error: toError(error).message, |
| 55 | }) |
| 56 | } |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | const deadline = Date.now() + maxWaitMs |
| 61 | while (Date.now() < deadline) { |
| 62 | await sleep(pollIntervalMs) |
| 63 | const value = await onFollower() |
| 64 | if (value !== null) return value |
| 65 | } |
| 66 | |
| 67 | // The leader may have persisted between our final poll and now; one last check. |
| 68 | const lastChance = await onFollower() |
| 69 | if (lastChance !== null) return lastChance |
| 70 | |
| 71 | logger.warn('Follower timed out waiting for leader', { key, maxWaitMs }) |
| 72 | return null |
| 73 | } |
no test coverage detected