(payload: TableUpdatePayload)
| 59 | * failed via `markTableUpdateFailed`. A superseded run returns quietly. |
| 60 | */ |
| 61 | export async function runTableUpdate(payload: TableUpdatePayload): Promise<void> { |
| 62 | const { jobId, tableId, workspaceId, filter, data, cutoff, maxRows } = payload |
| 63 | const requestId = generateId().slice(0, 8) |
| 64 | const budget = maxRows ?? Number.POSITIVE_INFINITY |
| 65 | |
| 66 | try { |
| 67 | const table = await getTableById(tableId, { includeArchived: true }) |
| 68 | if (!table) throw new Error(`Update target table ${tableId} not found`) |
| 69 | |
| 70 | const filterClause = buildFilterClause(filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns) |
| 71 | if (!filterClause) throw new Error('Filter is required for bulk update') |
| 72 | |
| 73 | // Coerce the patch once to the schema's types — the merged validation below and the persisted |
| 74 | // JSONB merge both use this normalized copy. |
| 75 | coerceRowValues(data, table.schema) |
| 76 | const patchJson = JSON.stringify(data) |
| 77 | |
| 78 | // Resume the persisted count: a retried attempt's earlier pages are already committed, so |
| 79 | // starting at zero would overwrite cumulative progress. Doubles as the initial ownership gate. |
| 80 | const resumed = await getJobProgress(tableId, jobId) |
| 81 | if (resumed === null) throw new JobSupersededError() |
| 82 | |
| 83 | let processed = resumed |
| 84 | let lastReported = resumed |
| 85 | let afterId: string | undefined |
| 86 | |
| 87 | while (processed < budget) { |
| 88 | const owns = await updateJobProgress(tableId, processed, jobId) |
| 89 | if (!owns) throw new JobSupersededError() |
| 90 | |
| 91 | const page = await selectRowDataPage({ |
| 92 | tableId, |
| 93 | workspaceId, |
| 94 | cutoff, |
| 95 | filterClause, |
| 96 | afterId, |
| 97 | limit: Math.min(TABLE_LIMITS.DELETE_PAGE_SIZE, budget - processed), |
| 98 | // Skip rows already carrying the patch so a retried run resumes without re-walking / |
| 99 | // double-counting the rows an earlier attempt updated (updated rows still exist and may |
| 100 | // still match the filter, unlike deletes). |
| 101 | excludeIfPatched: patchJson, |
| 102 | }) |
| 103 | if (page.length === 0) break |
| 104 | afterId = page[page.length - 1].id |
| 105 | |
| 106 | // Validate each merged result before writing the page — a row that would overflow the size |
| 107 | // cap or violate the schema fails the job (earlier pages stay applied; best-effort). |
| 108 | for (const row of page) { |
| 109 | const merged = { ...row.data, ...data } |
| 110 | const sizeValidation = validateRowSize(merged) |
| 111 | if (!sizeValidation.valid) { |
| 112 | throw new Error(`Row ${row.id}: ${sizeValidation.errors.join(', ')}`) |
| 113 | } |
| 114 | const schemaValidation = coerceRowToSchema(merged, table.schema) |
| 115 | if (!schemaValidation.valid) { |
| 116 | throw new Error(`Row ${row.id}: ${schemaValidation.errors.join(', ')}`) |
| 117 | } |
| 118 | } |
no test coverage detected