| 265 | * @returns Object containing the transaction result and lock wait time in milliseconds |
| 266 | */ |
| 267 | export async function withAdvisoryLockTransaction<T>({ |
| 268 | callback, |
| 269 | lockKey, |
| 270 | context = {}, |
| 271 | logger, |
| 272 | lockTimeoutMs = ADVISORY_LOCK_TIMEOUT_MS, |
| 273 | }: { |
| 274 | callback: TransactionCallback<T> |
| 275 | lockKey: string |
| 276 | context: Record<string, unknown> |
| 277 | logger: Logger |
| 278 | lockTimeoutMs?: number |
| 279 | }): Promise<AdvisoryLockTransactionResult<T>> { |
| 280 | // Validate lock key to prevent bugs from null/empty keys |
| 281 | if (!lockKey || typeof lockKey !== 'string' || lockKey.trim() === '') { |
| 282 | throw new Error('lockKey must be a non-empty string') |
| 283 | } |
| 284 | |
| 285 | return await withRetry( |
| 286 | async () => { |
| 287 | return await db.transaction( |
| 288 | async (tx) => { |
| 289 | // Set a statement timeout to prevent indefinite blocking if a lock holder hangs. |
| 290 | // This timeout applies to the lock acquisition and subsequent statements. |
| 291 | await tx.execute( |
| 292 | sql`SET LOCAL statement_timeout = ${sql.raw(lockTimeoutMs.toString())}`, |
| 293 | ) |
| 294 | |
| 295 | // Acquire advisory lock - blocks until lock is available (or timeout). |
| 296 | // We use MD5 to generate a 60-bit hash, dramatically reducing collision probability |
| 297 | // compared to hashtext() which only produces 32 bits. |
| 298 | // left(md5(key), 15) gives 15 hex chars (60 bits), which fits in a signed 64-bit bigint. |
| 299 | const lockStart = Date.now() |
| 300 | await tx.execute( |
| 301 | sql`SELECT pg_advisory_xact_lock(('x' || left(md5(${lockKey}), 15))::bit(60)::bigint)`, |
| 302 | ) |
| 303 | const lockWaitMs = Date.now() - lockStart |
| 304 | |
| 305 | // Log at WARN level only for significant waits (3+ seconds) to avoid excessive logging |
| 306 | if (lockWaitMs > SIGNIFICANT_LOCK_WAIT_MS) { |
| 307 | logger.warn( |
| 308 | { ...context, lockKey, lockWaitMs }, |
| 309 | `Advisory lock contention: waited ${(lockWaitMs / 1000).toFixed(1)}s for lock`, |
| 310 | ) |
| 311 | |
| 312 | // Track in PostHog for analytics |
| 313 | trackEvent({ |
| 314 | event: AnalyticsEvent.ADVISORY_LOCK_CONTENTION, |
| 315 | userId: getUserIdForAnalytics(context, lockKey), |
| 316 | properties: { |
| 317 | ...context, |
| 318 | lockKey, |
| 319 | lockKeyType: lockKey.split(':')[0], |
| 320 | lockWaitMs, |
| 321 | lockWaitSeconds: lockWaitMs / 1000, |
| 322 | }, |
| 323 | logger, |
| 324 | }) |