( data: BulkImportBatch, table: TableDefinition, requestId: string )
| 58 | * the batch for crossing `max_rows`; the caller marks the import failed. |
| 59 | */ |
| 60 | export async function bulkInsertImportBatch( |
| 61 | data: BulkImportBatch, |
| 62 | table: TableDefinition, |
| 63 | requestId: string |
| 64 | ): Promise<{ inserted: number; lastOrderKey: string | null }> { |
| 65 | for (let i = 0; i < data.rows.length; i++) { |
| 66 | const sizeValidation = validateRowSize(data.rows[i]) |
| 67 | if (!sizeValidation.valid) { |
| 68 | throw new Error(`Row ${i + 1}: ${sizeValidation.errors.join(', ')}`) |
| 69 | } |
| 70 | const schemaValidation = coerceRowToSchema(data.rows[i], table.schema) |
| 71 | if (!schemaValidation.valid) { |
| 72 | throw new Error(`Row ${i + 1}: ${schemaValidation.errors.join(', ')}`) |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | const uniqueColumns = getUniqueColumns(table.schema) |
| 77 | if (uniqueColumns.length > 0) { |
| 78 | const uniqueResult = await checkBatchUniqueConstraintsDb( |
| 79 | data.tableId, |
| 80 | data.rows, |
| 81 | table.schema, |
| 82 | db |
| 83 | ) |
| 84 | if (!uniqueResult.valid) { |
| 85 | throw new Error( |
| 86 | uniqueResult.errors.map((e) => `Row ${e.row + 1}: ${e.errors.join(', ')}`).join('; ') |
| 87 | ) |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | const now = new Date() |
| 92 | // Import worker is the table's sole writer; append keys after the anchor the caller threads |
| 93 | // from the previous batch's last key — no per-batch max(order_key) scan over a growing table. |
| 94 | const orderKeys = nKeysBetween(data.afterOrderKey ?? null, null, data.rows.length) |
| 95 | const rowsToInsert = data.rows.map((rowData, i) => ({ |
| 96 | id: `row_${generateId().replace(/-/g, '')}`, |
| 97 | tableId: data.tableId, |
| 98 | workspaceId: data.workspaceId, |
| 99 | data: rowData, |
| 100 | position: data.startPosition + i, |
| 101 | orderKey: orderKeys[i], |
| 102 | createdAt: now, |
| 103 | updatedAt: now, |
| 104 | ...(data.userId ? { createdBy: data.userId } : {}), |
| 105 | })) |
| 106 | |
| 107 | await db.insert(userTableRows).values(rowsToInsert) |
| 108 | logger.info(`[${requestId}] Bulk-imported ${rowsToInsert.length} rows into table ${data.tableId}`) |
| 109 | return { |
| 110 | inserted: rowsToInsert.length, |
| 111 | lastOrderKey: orderKeys[orderKeys.length - 1] ?? data.afterOrderKey ?? null, |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | /** Deletes every row of a table (set-based; the statement-level trigger zeroes `row_count`). */ |
| 116 | export async function deleteAllTableRows(tableId: string): Promise<void> { |
no test coverage detected