( chatId: string, timeoutMs = 5_000, expectedStreamId?: string )
| 69 | } |
| 70 | |
| 71 | export async function waitForPendingChatStream( |
| 72 | chatId: string, |
| 73 | timeoutMs = 5_000, |
| 74 | expectedStreamId?: string |
| 75 | ): Promise<boolean> { |
| 76 | const redis = getRedisClient() |
| 77 | const deadline = Date.now() + timeoutMs |
| 78 | |
| 79 | for (;;) { |
| 80 | const entry = pendingChatStreams.get(chatId) |
| 81 | const localPending = !!entry && (!expectedStreamId || entry.streamId === expectedStreamId) |
| 82 | |
| 83 | if (redis) { |
| 84 | try { |
| 85 | const ownerStreamId = await redis.get(getChatStreamLockKey(chatId)) |
| 86 | const lockReleased = |
| 87 | !ownerStreamId || (expectedStreamId !== undefined && ownerStreamId !== expectedStreamId) |
| 88 | if (!localPending && lockReleased) { |
| 89 | return true |
| 90 | } |
| 91 | } catch (error) { |
| 92 | logger.warn('Failed to inspect chat stream lock while waiting', { |
| 93 | chatId, |
| 94 | expectedStreamId, |
| 95 | error: toError(error).message, |
| 96 | }) |
| 97 | } |
| 98 | } else if (!localPending) { |
| 99 | return true |
| 100 | } |
| 101 | |
| 102 | if (Date.now() >= deadline) { |
| 103 | return false |
| 104 | } |
| 105 | await sleep(200) |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | export async function getPendingChatStreamId(chatId: string): Promise<string | null> { |
| 110 | const localEntry = pendingChatStreams.get(chatId) |
no test coverage detected