* Cursors on `endedAt` (terminal timestamp) rather than `startedAt`. A running * row's mutable fields (`endedAt`, `status`, `totalDurationMs`, `executionData`) * would otherwise be exported mid-flight and never re-emitted with their final * values. Filtering on `endedAt IS NOT NULL` guarantees ro
(input: SourcePageInput)
| 23 | * once visible to the drain. |
| 24 | */ |
| 25 | async function* pages(input: SourcePageInput): AsyncIterable<WorkflowLogRow[]> { |
| 26 | const workspaceIds = await getOrganizationWorkspaceIds(input.organizationId) |
| 27 | if (workspaceIds.length === 0) return |
| 28 | |
| 29 | let cursor = decodeTimeCursor(input.cursor) |
| 30 | while (!input.signal.aborted) { |
| 31 | const cursorClause = timeCursorPredicate( |
| 32 | workflowExecutionLogs.endedAt, |
| 33 | workflowExecutionLogs.id, |
| 34 | cursor |
| 35 | ) |
| 36 | |
| 37 | const rows = await dbReplica |
| 38 | .select() |
| 39 | .from(workflowExecutionLogs) |
| 40 | .where( |
| 41 | and( |
| 42 | inArray(workflowExecutionLogs.workspaceId, workspaceIds), |
| 43 | isNotNull(workflowExecutionLogs.endedAt), |
| 44 | timeCursorStabilityBound(workflowExecutionLogs.endedAt), |
| 45 | cursorClause |
| 46 | ) |
| 47 | ) |
| 48 | .orderBy(...timeCursorOrderBy(workflowExecutionLogs.endedAt, workflowExecutionLogs.id)) |
| 49 | .limit(input.chunkSize) |
| 50 | |
| 51 | if (rows.length === 0) return |
| 52 | |
| 53 | // Heavy execution data may live in object storage; resolve pointers (bounded |
| 54 | // concurrency) so the drain exports full execution data, not the slim row. |
| 55 | // Use the order-preserving returned array (the util's documented contract) |
| 56 | // and write back, rather than mutating rows inside the mapper. |
| 57 | const materialized = await mapWithConcurrency(rows, MATERIALIZE_CONCURRENCY, (row) => |
| 58 | materializeExecutionData(row.executionData as Record<string, unknown> | null, { |
| 59 | workspaceId: row.workspaceId, |
| 60 | workflowId: row.workflowId, |
| 61 | executionId: row.executionId, |
| 62 | }) |
| 63 | ) |
| 64 | for (let i = 0; i < rows.length; i++) { |
| 65 | rows[i].executionData = materialized[i] as WorkflowLogRow['executionData'] |
| 66 | } |
| 67 | |
| 68 | yield rows |
| 69 | const last = rows[rows.length - 1] |
| 70 | cursor = { ts: last.endedAt!.toISOString(), id: last.id } |
| 71 | if (rows.length < input.chunkSize) return |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | export const workflowLogsSource: DrainSource<WorkflowLogRow> = { |
| 76 | type: 'workflow_logs', |
nothing calls this directly
no test coverage detected