( chatId: string, streamId: string, timeoutMs = 5_000 )
| 187 | } |
| 188 | |
| 189 | export async function acquirePendingChatStream( |
| 190 | chatId: string, |
| 191 | streamId: string, |
| 192 | timeoutMs = 5_000 |
| 193 | ): Promise<boolean> { |
| 194 | // Span records wall time spent waiting for the per-chat stream lock. |
| 195 | // Typical case: sub-10ms uncontested acquire. Worst case: up to |
| 196 | // `timeoutMs` spent polling while a prior stream finishes. Previously |
| 197 | // this time looked like "unexplained gap before llm.stream". |
| 198 | return withCopilotSpan( |
| 199 | TraceSpan.CopilotChatAcquirePendingStreamLock, |
| 200 | { |
| 201 | [TraceAttr.ChatId]: chatId, |
| 202 | [TraceAttr.StreamId]: streamId, |
| 203 | [TraceAttr.LockTimeoutMs]: timeoutMs, |
| 204 | }, |
| 205 | async (span) => { |
| 206 | const redis = getRedisClient() |
| 207 | span.setAttribute(TraceAttr.LockBackend, redis ? AbortBackend.Redis : AbortBackend.InProcess) |
| 208 | if (redis) { |
| 209 | const deadline = Date.now() + timeoutMs |
| 210 | for (;;) { |
| 211 | try { |
| 212 | const acquired = await acquireLock( |
| 213 | getChatStreamLockKey(chatId), |
| 214 | streamId, |
| 215 | CHAT_STREAM_LOCK_TTL_SECONDS |
| 216 | ) |
| 217 | if (acquired) { |
| 218 | registerPendingChatStream(chatId, streamId) |
| 219 | span.setAttribute(TraceAttr.LockAcquired, true) |
| 220 | return true |
| 221 | } |
| 222 | if (!pendingChatStreams.has(chatId)) { |
| 223 | const ownerStreamId = await redis.get(getChatStreamLockKey(chatId)) |
| 224 | if (ownerStreamId) { |
| 225 | const settled = await waitForPendingChatStream(chatId, 0, ownerStreamId) |
| 226 | if (settled) { |
| 227 | continue |
| 228 | } |
| 229 | } |
| 230 | } |
| 231 | } catch (error) { |
| 232 | logger.warn('Failed to acquire chat stream lock', { |
| 233 | chatId, |
| 234 | streamId, |
| 235 | error: toError(error).message, |
| 236 | }) |
| 237 | } |
| 238 | |
| 239 | if (Date.now() >= deadline) { |
| 240 | span.setAttribute(TraceAttr.LockAcquired, false) |
| 241 | span.setAttribute(TraceAttr.LockTimedOut, true) |
| 242 | return false |
| 243 | } |
| 244 | await sleep(200) |
| 245 | } |
| 246 | } |
no test coverage detected