( envelopes: PersistedStreamEventEnvelope[] )
| 126 | } |
| 127 | |
| 128 | export async function appendEvents( |
| 129 | envelopes: PersistedStreamEventEnvelope[] |
| 130 | ): Promise<PersistedStreamEventEnvelope[]> { |
| 131 | if (envelopes.length === 0) { |
| 132 | return envelopes |
| 133 | } |
| 134 | |
| 135 | const streamId = envelopes[0].stream.streamId |
| 136 | const config = getStreamConfig() |
| 137 | |
| 138 | await withRedisRetry({ operation: 'append_event', streamId }, async (redis) => { |
| 139 | const key = getEventsKey(streamId) |
| 140 | const seqKey = getSeqKey(streamId) |
| 141 | const pipeline = redis.pipeline() |
| 142 | const zaddArgs: Array<number | string> = [] |
| 143 | for (const envelope of envelopes) { |
| 144 | zaddArgs.push(envelope.seq, JSON.stringify(envelope)) |
| 145 | } |
| 146 | pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array<number | string>])) |
| 147 | pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1) |
| 148 | pipeline.expire(key, config.ttlSeconds) |
| 149 | pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds) |
| 150 | await pipeline.exec() |
| 151 | }) |
| 152 | |
| 153 | return envelopes |
| 154 | } |
| 155 | |
| 156 | export async function appendEvent( |
| 157 | envelope: PersistedStreamEventEnvelope |
no test coverage detected