MCPcopy Index your code
hub / github.com/simstudioai/sim / runTableUpdate

Function runTableUpdate

apps/sim/lib/table/update-runner.ts:61–174  ·  view source on GitHub ↗
(payload: TableUpdatePayload)

Source from the content-addressed store, hash-verified

59 * failed via `markTableUpdateFailed`. A superseded run returns quietly.
60 */
61export async function runTableUpdate(payload: TableUpdatePayload): Promise<void> {
62 const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = payload
63 const requestId = generateId().slice(0, 8)
64 const budget = maxRows ?? Number.POSITIVE_INFINITY
65
66 try {
67 const table = await getTableById(tableId, { includeArchived: true })
68 if (!table) throw new Error(`Update target table ${tableId} not found`)
69
70 const filterClause = buildFilterClause(filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns)
71 if (!filterClause) throw new Error('Filter is required for bulk update')
72
73 // Coerce the patch once to the schema's types — the merged validation below and the persisted
74 // JSONB merge both use this normalized copy.
75 coerceRowValues(data, table.schema)
76 const patchJson = JSON.stringify(data)
77
78 // Resume the persisted count: a retried attempt's earlier pages are already committed, so
79 // starting at zero would overwrite cumulative progress. Doubles as the initial ownership gate.
80 const resumed = await getJobProgress(tableId, jobId)
81 if (resumed === null) throw new JobSupersededError()
82
83 let processed = resumed
84 let lastReported = resumed
85 let afterId: string | undefined
86
87 while (processed < budget) {
88 const owns = await updateJobProgress(tableId, processed, jobId)
89 if (!owns) throw new JobSupersededError()
90
91 const page = await selectRowDataPage({
92 tableId,
93 workspaceId,
94 cutoff,
95 filterClause,
96 afterId,
97 limit: Math.min(TABLE_LIMITS.DELETE_PAGE_SIZE, budget - processed),
98 // Skip rows already carrying the patch so a retried run resumes without re-walking /
99 // double-counting the rows an earlier attempt updated (updated rows still exist and may
100 // still match the filter, unlike deletes).
101 excludeIfPatched: patchJson,
102 })
103 if (page.length === 0) break
104 afterId = page[page.length - 1].id
105
106 // Validate each merged result before writing the page — a row that would overflow the size
107 // cap or violate the schema fails the job (earlier pages stay applied; best-effort).
108 for (const row of page) {
109 const merged = { ...row.data, ...data }
110 const sizeValidation = validateRowSize(merged)
111 if (!sizeValidation.valid) {
112 throw new Error(`Row ${row.id}: ${sizeValidation.errors.join(', ')}`)
113 }
114 const schemaValidation = coerceRowToSchema(merged, table.schema)
115 if (!schemaValidation.valid) {
116 throw new Error(`Row ${row.id}: ${schemaValidation.errors.join(', ')}`)
117 }
118 }

Callers 3

dispatchUpdateJobFunction · 0.90
table-update.tsFile · 0.90

Calls 15

generateIdFunction · 0.90
getTableByIdFunction · 0.90
buildFilterClauseFunction · 0.90
coerceRowValuesFunction · 0.90
getJobProgressFunction · 0.90
updateJobProgressFunction · 0.90
selectRowDataPageFunction · 0.90
validateRowSizeFunction · 0.90
coerceRowToSchemaFunction · 0.90
updatePageByIdsFunction · 0.90
appendTableEventFunction · 0.90
markJobReadyFunction · 0.90

Tested by

no test coverage detected