( metadata: RedisOperationMetadata, operation: (redis: NonNullable<ReturnType<typeof getRedisClient>>) => Promise<T> )
| 46 | } |
| 47 | |
| 48 | async function withRedisRetry<T>( |
| 49 | metadata: RedisOperationMetadata, |
| 50 | operation: (redis: NonNullable<ReturnType<typeof getRedisClient>>) => Promise<T> |
| 51 | ): Promise<T> { |
| 52 | const redis = getRedisClient() |
| 53 | if (!redis) { |
| 54 | throw new Error('Redis is required for mothership stream durability') |
| 55 | } |
| 56 | |
| 57 | let lastError: unknown |
| 58 | |
| 59 | for (let attempt = 0; attempt < RETRY_DELAYS_MS.length; attempt++) { |
| 60 | const delay = RETRY_DELAYS_MS[attempt] |
| 61 | if (delay > 0) { |
| 62 | await sleep(delay) |
| 63 | } |
| 64 | |
| 65 | try { |
| 66 | return await operation(redis) |
| 67 | } catch (error) { |
| 68 | lastError = error |
| 69 | logger.warn('Redis stream operation failed', { |
| 70 | operation: metadata.operation, |
| 71 | streamId: metadata.streamId, |
| 72 | attempt: attempt + 1, |
| 73 | error: toError(error).message, |
| 74 | }) |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | throw lastError instanceof Error |
| 79 | ? lastError |
| 80 | : new Error(`${metadata.operation} failed for stream ${metadata.streamId}`) |
| 81 | } |
| 82 | |
| 83 | export async function allocateCursor(streamId: string): Promise<{ |
| 84 | seq: number |
no test coverage detected