* Cursors on terminal `endedAt` so in-flight rows (mutable `status`, `endedAt`, * `totalDurationMs`, `executionData`) are not exported until finalized.
(input: SourcePageInput)
| 18 | * `totalDurationMs`, `executionData`) are not exported until finalized. |
| 19 | */ |
| 20 | async function* pages(input: SourcePageInput): AsyncIterable<JobLogRow[]> { |
| 21 | const workspaceIds = await getOrganizationWorkspaceIds(input.organizationId) |
| 22 | if (workspaceIds.length === 0) return |
| 23 | |
| 24 | let cursor = decodeTimeCursor(input.cursor) |
| 25 | while (!input.signal.aborted) { |
| 26 | const cursorClause = timeCursorPredicate(jobExecutionLogs.endedAt, jobExecutionLogs.id, cursor) |
| 27 | |
| 28 | const rows = await dbReplica |
| 29 | .select() |
| 30 | .from(jobExecutionLogs) |
| 31 | .where( |
| 32 | and( |
| 33 | inArray(jobExecutionLogs.workspaceId, workspaceIds), |
| 34 | isNotNull(jobExecutionLogs.endedAt), |
| 35 | timeCursorStabilityBound(jobExecutionLogs.endedAt), |
| 36 | cursorClause |
| 37 | ) |
| 38 | ) |
| 39 | .orderBy(...timeCursorOrderBy(jobExecutionLogs.endedAt, jobExecutionLogs.id)) |
| 40 | .limit(input.chunkSize) |
| 41 | |
| 42 | if (rows.length === 0) return |
| 43 | yield rows |
| 44 | const last = rows[rows.length - 1] |
| 45 | cursor = { ts: last.endedAt!.toISOString(), id: last.id } |
| 46 | if (rows.length < input.chunkSize) return |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | export const jobLogsSource: DrainSource<JobLogRow> = { |
| 51 | type: 'job_logs', |
nothing calls this directly
no test coverage detected