MCPcopy Index your code
hub / github.com/simstudioai/sim / pages

Function pages

apps/sim/lib/data-drains/sources/workflow-logs.ts:25–73  ·  view source on GitHub ↗

* Cursors on `endedAt` (terminal timestamp) rather than `startedAt`. A running * row's mutable fields (`endedAt`, `status`, `totalDurationMs`, `executionData`) * would otherwise be exported mid-flight and never re-emitted with their final * values. Filtering on `endedAt IS NOT NULL` guarantees ro

(input: SourcePageInput)

Source from the content-addressed store, hash-verified

23 * once visible to the drain.
24 */
25async function* pages(input: SourcePageInput): AsyncIterable<WorkflowLogRow[]> {
26 const workspaceIds = await getOrganizationWorkspaceIds(input.organizationId)
27 if (workspaceIds.length === 0) return
28
29 let cursor = decodeTimeCursor(input.cursor)
30 while (!input.signal.aborted) {
31 const cursorClause = timeCursorPredicate(
32 workflowExecutionLogs.endedAt,
33 workflowExecutionLogs.id,
34 cursor
35 )
36
37 const rows = await dbReplica
38 .select()
39 .from(workflowExecutionLogs)
40 .where(
41 and(
42 inArray(workflowExecutionLogs.workspaceId, workspaceIds),
43 isNotNull(workflowExecutionLogs.endedAt),
44 timeCursorStabilityBound(workflowExecutionLogs.endedAt),
45 cursorClause
46 )
47 )
48 .orderBy(...timeCursorOrderBy(workflowExecutionLogs.endedAt, workflowExecutionLogs.id))
49 .limit(input.chunkSize)
50
51 if (rows.length === 0) return
52
53 // Heavy execution data may live in object storage; resolve pointers (bounded
54 // concurrency) so the drain exports full execution data, not the slim row.
55 // Use the order-preserving returned array (the util's documented contract)
56 // and write back, rather than mutating rows inside the mapper.
57 const materialized = await mapWithConcurrency(rows, MATERIALIZE_CONCURRENCY, (row) =>
58 materializeExecutionData(row.executionData as Record<string, unknown> | null, {
59 workspaceId: row.workspaceId,
60 workflowId: row.workflowId,
61 executionId: row.executionId,
62 })
63 )
64 for (let i = 0; i < rows.length; i++) {
65 rows[i].executionData = materialized[i] as WorkflowLogRow['executionData']
66 }
67
68 yield rows
69 const last = rows[rows.length - 1]
70 cursor = { ts: last.endedAt!.toISOString(), id: last.id }
71 if (rows.length < input.chunkSize) return
72 }
73}
74
75export const workflowLogsSource: DrainSource<WorkflowLogRow> = {
76 type: 'workflow_logs',

Callers

nothing calls this directly

Calls 7

decodeTimeCursorFunction · 0.90
timeCursorPredicateFunction · 0.90
timeCursorStabilityBoundFunction · 0.90
timeCursorOrderByFunction · 0.90
mapWithConcurrencyFunction · 0.90
materializeExecutionDataFunction · 0.90

Tested by

no test coverage detected