MCPcopy
hub / github.com/simstudioai/sim / processBackfillPage

Function processBackfillPage

apps/sim/lib/table/backfill-runner.ts:101–182  ·  view source on GitHub ↗

* Backfills one page of rows: pulls each target output's value out of the rows' saved trace * spans (materialized from object storage with bounded concurrency) and writes it into row data. * Returns the number of rows updated.

(opts: {
  table: TableDefinition
  outputs: WorkflowGroupOutput[]
  overwrite: boolean
  execs: Array<{ rowId: string; executionId: string | null }>
  requestId: string
  actorUserId?: string | null
})

Source from the content-addressed store, hash-verified

99 * Returns the number of rows updated.
100 */
101async function processBackfillPage(opts: {
102 table: TableDefinition
103 outputs: WorkflowGroupOutput[]
104 overwrite: boolean
105 execs: Array<{ rowId: string; executionId: string | null }>
106 requestId: string
107 actorUserId?: string | null
108}): Promise<number> {
109 const { table, outputs, overwrite, execs, requestId, actorUserId } = opts
110
111 const executionIdsByRow = new Map<string, string>()
112 for (const e of execs) {
113 if (!e.executionId) continue
114 executionIdsByRow.set(e.rowId, e.executionId)
115 }
116 if (executionIdsByRow.size === 0) return 0
117
118 const rowRecords = await db
119 .select({ id: userTableRows.id, data: userTableRows.data })
120 .from(userTableRows)
121 .where(
122 and(
123 eq(userTableRows.tableId, table.id),
124 inArray(userTableRows.id, Array.from(executionIdsByRow.keys()))
125 )
126 )
127
128 const executionIds = Array.from(new Set(executionIdsByRow.values()))
129 const logs = await db
130 .select({
131 executionId: workflowExecutionLogs.executionId,
132 workflowId: workflowExecutionLogs.workflowId,
133 workspaceId: workflowExecutionLogs.workspaceId,
134 executionData: workflowExecutionLogs.executionData,
135 })
136 .from(workflowExecutionLogs)
137 .where(inArray(workflowExecutionLogs.executionId, executionIds))
138
139 const logByExecutionId = new Map<string, { traceSpans?: BackfillTraceSpan[] }>()
140 // Heavy execution data may live in object storage; resolve pointers (bounded concurrency).
141 await mapWithConcurrency(logs, MATERIALIZE_CONCURRENCY, async (log) => {
142 const executionData = await materializeExecutionData(
143 log.executionData as Record<string, unknown> | null,
144 { workspaceId: log.workspaceId, workflowId: log.workflowId, executionId: log.executionId }
145 )
146 logByExecutionId.set(
147 log.executionId,
148 (executionData as { traceSpans?: BackfillTraceSpan[] }) ?? {}
149 )
150 })
151
152 const updates: Array<{ rowId: string; data: RowData }> = []
153 for (const r of rowRecords) {
154 const execId = executionIdsByRow.get(r.id)
155 if (!execId) continue
156 const log = logByExecutionId.get(execId)
157 if (!log) continue
158

Callers 2

runTableBackfillFunction · 0.85

Calls 9

mapWithConcurrencyFunction · 0.90
materializeExecutionDataFunction · 0.90
pluckByPathFunction · 0.90
batchUpdateRowsFunction · 0.90
findSpanByBlockIdFunction · 0.85
setMethod · 0.65
getMethod · 0.65
eqFunction · 0.50
pushMethod · 0.45

Tested by

no test coverage detected