* Backfills one page of rows: pulls each target output's value out of the rows' saved trace * spans (materialized from object storage with bounded concurrency) and writes it into row data. * Returns the number of rows updated.
(opts: {
table: TableDefinition
outputs: WorkflowGroupOutput[]
overwrite: boolean
execs: Array<{ rowId: string; executionId: string | null }>
requestId: string
actorUserId?: string | null
})
| 99 | * Returns the number of rows updated. |
| 100 | */ |
| 101 | async function processBackfillPage(opts: { |
| 102 | table: TableDefinition |
| 103 | outputs: WorkflowGroupOutput[] |
| 104 | overwrite: boolean |
| 105 | execs: Array<{ rowId: string; executionId: string | null }> |
| 106 | requestId: string |
| 107 | actorUserId?: string | null |
| 108 | }): Promise<number> { |
| 109 | const { table, outputs, overwrite, execs, requestId, actorUserId } = opts |
| 110 | |
| 111 | const executionIdsByRow = new Map<string, string>() |
| 112 | for (const e of execs) { |
| 113 | if (!e.executionId) continue |
| 114 | executionIdsByRow.set(e.rowId, e.executionId) |
| 115 | } |
| 116 | if (executionIdsByRow.size === 0) return 0 |
| 117 | |
| 118 | const rowRecords = await db |
| 119 | .select({ id: userTableRows.id, data: userTableRows.data }) |
| 120 | .from(userTableRows) |
| 121 | .where( |
| 122 | and( |
| 123 | eq(userTableRows.tableId, table.id), |
| 124 | inArray(userTableRows.id, Array.from(executionIdsByRow.keys())) |
| 125 | ) |
| 126 | ) |
| 127 | |
| 128 | const executionIds = Array.from(new Set(executionIdsByRow.values())) |
| 129 | const logs = await db |
| 130 | .select({ |
| 131 | executionId: workflowExecutionLogs.executionId, |
| 132 | workflowId: workflowExecutionLogs.workflowId, |
| 133 | workspaceId: workflowExecutionLogs.workspaceId, |
| 134 | executionData: workflowExecutionLogs.executionData, |
| 135 | }) |
| 136 | .from(workflowExecutionLogs) |
| 137 | .where(inArray(workflowExecutionLogs.executionId, executionIds)) |
| 138 | |
| 139 | const logByExecutionId = new Map<string, { traceSpans?: BackfillTraceSpan[] }>() |
| 140 | // Heavy execution data may live in object storage; resolve pointers (bounded concurrency). |
| 141 | await mapWithConcurrency(logs, MATERIALIZE_CONCURRENCY, async (log) => { |
| 142 | const executionData = await materializeExecutionData( |
| 143 | log.executionData as Record<string, unknown> | null, |
| 144 | { workspaceId: log.workspaceId, workflowId: log.workflowId, executionId: log.executionId } |
| 145 | ) |
| 146 | logByExecutionId.set( |
| 147 | log.executionId, |
| 148 | (executionData as { traceSpans?: BackfillTraceSpan[] }) ?? {} |
| 149 | ) |
| 150 | }) |
| 151 | |
| 152 | const updates: Array<{ rowId: string; data: RowData }> = [] |
| 153 | for (const r of rowRecords) { |
| 154 | const execId = executionIdsByRow.get(r.id) |
| 155 | if (!execId) continue |
| 156 | const log = logByExecutionId.get(execId) |
| 157 | if (!log) continue |
| 158 |
no test coverage detected