( streamId: string, ttlSeconds = DEFAULT_COMPLETED_TTL_SECONDS )
| 105 | } |
| 106 | |
| 107 | export async function scheduleBufferCleanup( |
| 108 | streamId: string, |
| 109 | ttlSeconds = DEFAULT_COMPLETED_TTL_SECONDS |
| 110 | ): Promise<void> { |
| 111 | try { |
| 112 | await withRedisRetry({ operation: 'schedule_outbox_cleanup', streamId }, async (redis) => { |
| 113 | const pipeline = redis.pipeline() |
| 114 | pipeline.expire(getEventsKey(streamId), ttlSeconds) |
| 115 | pipeline.expire(getSeqKey(streamId), ttlSeconds) |
| 116 | pipeline.expire(getAbortKey(streamId), ttlSeconds) |
| 117 | await pipeline.exec() |
| 118 | }) |
| 119 | } catch (error) { |
| 120 | logger.warn('Failed to shorten stream buffer TTL during cleanup', { |
| 121 | streamId, |
| 122 | ttlSeconds, |
| 123 | error: toError(error).message, |
| 124 | }) |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | export async function appendEvents( |
| 129 | envelopes: PersistedStreamEventEnvelope[] |
no test coverage detected