MCPcopy Index your code
hub / github.com/simstudioai/sim / acquirePendingChatStream

Function acquirePendingChatStream

apps/sim/lib/copilot/request/session/abort.ts:189–268  ·  view source on GitHub ↗
(
  chatId: string,
  streamId: string,
  timeoutMs = 5_000
)

Source from the content-addressed store, hash-verified

187}
188
189export async function acquirePendingChatStream(
190 chatId: string,
191 streamId: string,
192 timeoutMs = 5_000
193): Promise<boolean> {
194 // Span records wall time spent waiting for the per-chat stream lock.
195 // Typical case: sub-10ms uncontested acquire. Worst case: up to
196 // `timeoutMs` spent polling while a prior stream finishes. Previously
197 // this time looked like "unexplained gap before llm.stream".
198 return withCopilotSpan(
199 TraceSpan.CopilotChatAcquirePendingStreamLock,
200 {
201 [TraceAttr.ChatId]: chatId,
202 [TraceAttr.StreamId]: streamId,
203 [TraceAttr.LockTimeoutMs]: timeoutMs,
204 },
205 async (span) => {
206 const redis = getRedisClient()
207 span.setAttribute(TraceAttr.LockBackend, redis ? AbortBackend.Redis : AbortBackend.InProcess)
208 if (redis) {
209 const deadline = Date.now() + timeoutMs
210 for (;;) {
211 try {
212 const acquired = await acquireLock(
213 getChatStreamLockKey(chatId),
214 streamId,
215 CHAT_STREAM_LOCK_TTL_SECONDS
216 )
217 if (acquired) {
218 registerPendingChatStream(chatId, streamId)
219 span.setAttribute(TraceAttr.LockAcquired, true)
220 return true
221 }
222 if (!pendingChatStreams.has(chatId)) {
223 const ownerStreamId = await redis.get(getChatStreamLockKey(chatId))
224 if (ownerStreamId) {
225 const settled = await waitForPendingChatStream(chatId, 0, ownerStreamId)
226 if (settled) {
227 continue
228 }
229 }
230 }
231 } catch (error) {
232 logger.warn('Failed to acquire chat stream lock', {
233 chatId,
234 streamId,
235 error: toError(error).message,
236 })
237 }
238
239 if (Date.now() >= deadline) {
240 span.setAttribute(TraceAttr.LockAcquired, false)
241 span.setAttribute(TraceAttr.LockTimedOut, true)
242 return false
243 }
244 await sleep(200)
245 }
246 }

Callers 2

abort.test.tsFile · 0.90
handleUnifiedChatPostFunction · 0.90

Calls 11

withCopilotSpanFunction · 0.90
getRedisClientFunction · 0.90
acquireLockFunction · 0.90
toErrorFunction · 0.90
sleepFunction · 0.90
getChatStreamLockKeyFunction · 0.85
waitForPendingChatStreamFunction · 0.85
getMethod · 0.65
warnMethod · 0.65
resolveFunction · 0.50

Tested by

no test coverage detected