MCPcopy Index your code
hub / github.com/simstudioai/sim / runWithRedisMutex

Function runWithRedisMutex

apps/sim/lib/mcp/oauth/storage.ts:312–366  ·  view source on GitHub ↗
(
  lockKey: string,
  rowId: string,
  fn: () => Promise<T>
)

Source from the content-addressed store, hash-verified

310}
311
312async function runWithRedisMutex<T>(
313 lockKey: string,
314 rowId: string,
315 fn: () => Promise<T>
316): Promise<T> {
317 const ownerToken = generateShortId()
318 const deadline = Date.now() + REFRESH_MAX_WAIT_MS
319
320 while (true) {
321 let acquired = false
322 try {
323 acquired = await acquireLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC)
324 } catch (error) {
325 logger.warn('Redis unavailable, running OAuth flow uncoordinated', {
326 rowId,
327 error: toError(error).message,
328 })
329 return fn()
330 }
331
332 if (acquired) {
333 const watchdog = setInterval(() => {
334 extendLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC).catch((error) => {
335 logger.warn('Refresh lock extend failed', {
336 rowId,
337 error: toError(error).message,
338 })
339 })
340 }, REFRESH_LOCK_EXTEND_INTERVAL_MS)
341 try {
342 return await fn()
343 } finally {
344 clearInterval(watchdog)
345 await releaseLock(lockKey, ownerToken).catch((error) => {
346 logger.warn('Refresh lock release failed (will expire via TTL)', {
347 rowId,
348 error: toError(error).message,
349 })
350 })
351 }
352 }
353
354 if (Date.now() >= deadline) {
355 // Lock still held by another process AND its watchdog is keeping it
356 // alive — falling open would let us refresh concurrently and race the
357 // rotating refresh token. Throw and let the caller decide whether to
358 // retry; the Redis-down path remains the only branch that runs `fn()`
359 // uncoordinated (no coordination available there).
360 throw new Error(
361 `MCP OAuth refresh lock for ${rowId} held longer than ${REFRESH_MAX_WAIT_MS}ms`
362 )
363 }
364 await sleep(REFRESH_POLL_INTERVAL_MS)
365 }
366}

Callers 1

withMcpOauthRefreshLockFunction · 0.85

Calls 8

generateShortIdFunction · 0.90
acquireLockFunction · 0.90
toErrorFunction · 0.90
extendLockFunction · 0.90
releaseLockFunction · 0.90
sleepFunction · 0.90
warnMethod · 0.65
fnFunction · 0.50

Tested by

no test coverage detected