( ownerKey: string, leaseId: string, timeoutMs: number )
| 345 | type LeaseAcquireResult = 'acquired' | 'limit_exceeded' | 'unavailable' |
| 346 | |
| 347 | async function tryAcquireDistributedLease( |
| 348 | ownerKey: string, |
| 349 | leaseId: string, |
| 350 | timeoutMs: number |
| 351 | ): Promise<LeaseAcquireResult> { |
| 352 | // Redis not configured: explicit local-mode fallback is allowed. |
| 353 | if (!env.REDIS_URL) return 'acquired' |
| 354 | |
| 355 | const redis = getRedisClient() |
| 356 | if (!redis) { |
| 357 | logger.error('Redis is configured but unavailable for distributed lease acquisition', { |
| 358 | ownerKey, |
| 359 | }) |
| 360 | return 'unavailable' |
| 361 | } |
| 362 | |
| 363 | const now = Date.now() |
| 364 | const leaseTtlMs = Math.max( |
| 365 | timeoutMs + QUEUE_TIMEOUT_MS + DISTRIBUTED_LEASE_GRACE_MS, |
| 366 | DISTRIBUTED_LEASE_MIN_TTL_MS |
| 367 | ) |
| 368 | const expiresAt = now + leaseTtlMs |
| 369 | const key = ownerRedisKey(ownerKey) |
| 370 | |
| 371 | const script = ` |
| 372 | redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[1]) |
| 373 | local current = redis.call('ZCARD', KEYS[1]) |
| 374 | if current >= tonumber(ARGV[2]) then |
| 375 | return 0 |
| 376 | end |
| 377 | redis.call('ZADD', KEYS[1], ARGV[3], ARGV[4]) |
| 378 | redis.call('PEXPIRE', KEYS[1], ARGV[5]) |
| 379 | return 1 |
| 380 | ` |
| 381 | |
| 382 | let deadlineTimer: NodeJS.Timeout | undefined |
| 383 | const deadline = new Promise<never>((_, reject) => { |
| 384 | deadlineTimer = setTimeout( |
| 385 | () => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)), |
| 386 | LEASE_REDIS_DEADLINE_MS |
| 387 | ) |
| 388 | }) |
| 389 | |
| 390 | try { |
| 391 | const result = await Promise.race([ |
| 392 | redis.eval( |
| 393 | script, |
| 394 | 1, |
| 395 | key, |
| 396 | now.toString(), |
| 397 | DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), |
| 398 | expiresAt.toString(), |
| 399 | leaseId, |
| 400 | leaseTtlMs.toString() |
| 401 | ), |
| 402 | deadline, |
| 403 | ]) |
| 404 | return Number(result) === 1 ? 'acquired' : 'limit_exceeded' |
no test coverage detected