MCPcopy Index your code
hub / github.com/simstudioai/sim / readEvents

Function readEvents

apps/sim/lib/copilot/request/session/buffer.ts:173–202  ·  view source on GitHub ↗
(
  streamId: string,
  afterCursor: string
)

Source from the content-addressed store, hash-verified

171}
172
173export async function readEvents(
174 streamId: string,
175 afterCursor: string
176): Promise<PersistedStreamEventEnvelope[]> {
177 const afterSeq = Number(afterCursor || '0')
178 if (!Number.isFinite(afterSeq)) {
179 throw new InvalidCursorError(streamId, afterCursor)
180 }
181 const minScore = afterSeq + 1
182
183 const rawEntries = await withRedisRetry({ operation: 'read_events', streamId }, async (redis) => {
184 return redis.zrangebyscore(getEventsKey(streamId), minScore, '+inf')
185 })
186
187 const envelopes: PersistedStreamEventEnvelope[] = []
188 for (const entry of rawEntries) {
189 const parsed = parsePersistedStreamEventEnvelopeJson(entry)
190 if (!parsed.ok) {
191 logger.warn('Skipping corrupt outbox entry', {
192 streamId,
193 reason: parsed.reason,
194 message: parsed.message,
195 errors: parsed.errors,
196 })
197 continue
198 }
199 envelopes.push(parsed.event)
200 }
201 return envelopes
202}
203
204export async function getOldestSeq(streamId: string): Promise<number | null> {
205 return withRedisRetry({ operation: 'get_oldest_seq', streamId }, async (redis) => {

Callers 6

buffer.test.tsFile · 0.90
route.tsFile · 0.90
GETFunction · 0.90
handleResumeRequestBodyFunction · 0.90
flushEventsFunction · 0.90

Calls 5

withRedisRetryFunction · 0.70
getEventsKeyFunction · 0.70
warnMethod · 0.65
pushMethod · 0.45

Tested by

no test coverage detected