( table: TableDefinition, data: BulkUpdateData, requestId: string )
| 1314 | * @returns Bulk operation result |
| 1315 | */ |
| 1316 | export async function updateRowsByFilter( |
| 1317 | table: TableDefinition, |
| 1318 | data: BulkUpdateData, |
| 1319 | requestId: string |
| 1320 | ): Promise<BulkOperationResult> { |
| 1321 | const tableName = USER_TABLE_ROWS_SQL_NAME |
| 1322 | |
| 1323 | const filterClause = buildFilterClause(data.filter, tableName, table.schema.columns) |
| 1324 | if (!filterClause) { |
| 1325 | throw new Error('Filter is required for bulk update') |
| 1326 | } |
| 1327 | |
| 1328 | const baseConditions = and( |
| 1329 | eq(userTableRows.tableId, table.id), |
| 1330 | eq(userTableRows.workspaceId, table.workspaceId) |
| 1331 | ) |
| 1332 | |
| 1333 | // Tenant-bounded: the jsonb filter is unestimatable and otherwise sends the planner to a |
| 1334 | // whole-shared-relation seq scan (14.4s measured on a 1M-row table). |
| 1335 | const matchingRows = await withSeqscanOff(async (trx) => { |
| 1336 | let query = trx |
| 1337 | .select({ id: userTableRows.id, data: userTableRows.data }) |
| 1338 | .from(userTableRows) |
| 1339 | .where(and(baseConditions, filterClause)) |
| 1340 | if (data.limit) { |
| 1341 | query = query.limit(data.limit) as typeof query |
| 1342 | } |
| 1343 | return query |
| 1344 | }) |
| 1345 | |
| 1346 | if (matchingRows.length === 0) { |
| 1347 | return { affectedCount: 0, affectedRowIds: [] } |
| 1348 | } |
| 1349 | |
| 1350 | // Coerce the patch itself in place — the write below persists `data.data` |
| 1351 | // (as `patchJson`), so coercing only the per-row merged copies would be |
| 1352 | // discarded. The merged validation in the loop still enforces required |
| 1353 | // fields against the full row. |
| 1354 | coerceRowValues(data.data, table.schema) |
| 1355 | |
| 1356 | for (const row of matchingRows) { |
| 1357 | const existingData = row.data as RowData |
| 1358 | const mergedData = { ...existingData, ...data.data } |
| 1359 | |
| 1360 | const sizeValidation = validateRowSize(mergedData) |
| 1361 | if (!sizeValidation.valid) { |
| 1362 | throw new Error(`Row ${row.id}: ${sizeValidation.errors.join(', ')}`) |
| 1363 | } |
| 1364 | |
| 1365 | const schemaValidation = coerceRowToSchema(mergedData, table.schema) |
| 1366 | if (!schemaValidation.valid) { |
| 1367 | throw new Error(`Row ${row.id}: ${schemaValidation.errors.join(', ')}`) |
| 1368 | } |
| 1369 | } |
| 1370 | |
| 1371 | const uniqueColumns = getUniqueColumns(table.schema) |
| 1372 | const uniqueColumnsInUpdate = uniqueColumns.filter((col) => col.name in data.data) |
| 1373 | if (uniqueColumnsInUpdate.length > 0) { |
no test coverage detected