MCPcopy Index your code
hub / github.com/simstudioai/sim / withMcpOauthRefreshLock

Function withMcpOauthRefreshLock

apps/sim/lib/mcp/oauth/storage.ts:272–310  ·  view source on GitHub ↗
(rowId: string, fn: () => Promise<T>)

Source from the content-addressed store, hash-verified

270const inflightChains = new Map<string, Promise<unknown>>()
271
272export async function withMcpOauthRefreshLock<T>(rowId: string, fn: () => Promise<T>): Promise<T> {
273 const lockKey = `mcp:oauth:refresh:${rowId}`
274 const prev = inflightChains.get(lockKey) ?? Promise.resolve()
275 const prevSettled = prev.catch(() => undefined)
276
277 let queueTimedOut = false
278 const next = prevSettled.then(() => {
279 if (queueTimedOut) {
280 throw new Error(`MCP OAuth refresh queue for ${rowId} abandoned after timeout`)
281 }
282 return runWithRedisMutex(lockKey, rowId, fn)
283 })
284 inflightChains.set(lockKey, next)
285 const cleanup = () => {
286 if (inflightChains.get(lockKey) === next) inflightChains.delete(lockKey)
287 }
288 next.then(cleanup, cleanup)
289
290 let queueTimer: ReturnType<typeof setTimeout> | undefined
291 const queueDeadline = new Promise<never>((_, reject) => {
292 queueTimer = setTimeout(() => {
293 queueTimedOut = true
294 reject(
295 new Error(
296 `MCP OAuth refresh queue for ${rowId} stalled for ${REFRESH_QUEUE_WAIT_TIMEOUT_MS}ms`
297 )
298 )
299 }, REFRESH_QUEUE_WAIT_TIMEOUT_MS)
300 queueTimer.unref?.()
301 })
302
303 try {
304 await Promise.race([prevSettled, queueDeadline])
305 } finally {
306 clearTimeout(queueTimer)
307 }
308
309 return next
310}
311
312async function runWithRedisMutex<T>(
313 lockKey: string,

Callers 2

createClientMethod · 0.90
storage.test.tsFile · 0.90

Calls 4

runWithRedisMutexFunction · 0.85
getMethod · 0.65
resolveMethod · 0.65
setMethod · 0.65

Tested by

no test coverage detected