( data: BatchUpdateByIdData, table: TableDefinition, requestId: string )
| 1452 | * Avoids the race condition of parallel update_row calls overwriting each other. |
| 1453 | */ |
| 1454 | export async function batchUpdateRows( |
| 1455 | data: BatchUpdateByIdData, |
| 1456 | table: TableDefinition, |
| 1457 | requestId: string |
| 1458 | ): Promise<BulkOperationResult> { |
| 1459 | if (data.updates.length === 0) { |
| 1460 | return { affectedCount: 0, affectedRowIds: [] } |
| 1461 | } |
| 1462 | |
| 1463 | const rowIds = data.updates.map((u) => u.rowId) |
| 1464 | const existingRows = await db |
| 1465 | .select({ |
| 1466 | id: userTableRows.id, |
| 1467 | data: userTableRows.data, |
| 1468 | }) |
| 1469 | .from(userTableRows) |
| 1470 | .where( |
| 1471 | and( |
| 1472 | eq(userTableRows.tableId, data.tableId), |
| 1473 | eq(userTableRows.workspaceId, data.workspaceId), |
| 1474 | inArray(userTableRows.id, rowIds) |
| 1475 | ) |
| 1476 | ) |
| 1477 | |
| 1478 | const executionsByRow = await loadExecutionsByRow( |
| 1479 | db, |
| 1480 | existingRows.map((r) => r.id) |
| 1481 | ) |
| 1482 | |
| 1483 | type ExistingRow = { data: RowData; executions: RowExecutions } |
| 1484 | const existingMap = new Map<string, ExistingRow>( |
| 1485 | existingRows.map((r) => [ |
| 1486 | r.id, |
| 1487 | { data: r.data as RowData, executions: executionsByRow.get(r.id) ?? {} }, |
| 1488 | ]) |
| 1489 | ) |
| 1490 | |
| 1491 | const missing = rowIds.filter((id) => !existingMap.has(id)) |
| 1492 | if (missing.length > 0) { |
| 1493 | throw new Error(`Rows not found: ${missing.join(', ')}`) |
| 1494 | } |
| 1495 | |
| 1496 | const mergedUpdates: Array<{ |
| 1497 | rowId: string |
| 1498 | mergedData: RowData |
| 1499 | mergedExecutions: RowExecutions |
| 1500 | executionsPatch?: Record<string, RowExecutionMetadata | null> |
| 1501 | inFlightDownstreamGroups: string[] |
| 1502 | }> = [] |
| 1503 | for (const update of data.updates) { |
| 1504 | const existing = existingMap.get(update.rowId)! |
| 1505 | const merged = { ...existing.data, ...update.data } |
| 1506 | // Auto-clear exec records for workflow output columns the user just |
| 1507 | // wiped AND downstream dep-changed terminal groups — same rationale as |
| 1508 | // `updateRow`. Per-row in-flight downstream groups are surfaced so we |
| 1509 | // can run the cancel+rerun orchestration after the batch commits. |
| 1510 | const { executionsPatch: effectiveExecutionsPatch, inFlightDownstreamGroups } = |
| 1511 | deriveExecClearsForDataPatch( |
no test coverage detected