( streamId: string, afterCursor: string )
| 171 | } |
| 172 | |
| 173 | export async function readEvents( |
| 174 | streamId: string, |
| 175 | afterCursor: string |
| 176 | ): Promise<PersistedStreamEventEnvelope[]> { |
| 177 | const afterSeq = Number(afterCursor || '0') |
| 178 | if (!Number.isFinite(afterSeq)) { |
| 179 | throw new InvalidCursorError(streamId, afterCursor) |
| 180 | } |
| 181 | const minScore = afterSeq + 1 |
| 182 | |
| 183 | const rawEntries = await withRedisRetry({ operation: 'read_events', streamId }, async (redis) => { |
| 184 | return redis.zrangebyscore(getEventsKey(streamId), minScore, '+inf') |
| 185 | }) |
| 186 | |
| 187 | const envelopes: PersistedStreamEventEnvelope[] = [] |
| 188 | for (const entry of rawEntries) { |
| 189 | const parsed = parsePersistedStreamEventEnvelopeJson(entry) |
| 190 | if (!parsed.ok) { |
| 191 | logger.warn('Skipping corrupt outbox entry', { |
| 192 | streamId, |
| 193 | reason: parsed.reason, |
| 194 | message: parsed.message, |
| 195 | errors: parsed.errors, |
| 196 | }) |
| 197 | continue |
| 198 | } |
| 199 | envelopes.push(parsed.event) |
| 200 | } |
| 201 | return envelopes |
| 202 | } |
| 203 | |
| 204 | export async function getOldestSeq(streamId: string): Promise<number | null> { |
| 205 | return withRedisRetry({ operation: 'get_oldest_seq', streamId }, async (redis) => { |
no test coverage detected