MCPcopy Index your code
hub / github.com/simstudioai/sim / bulkInsertImportBatch

Function bulkInsertImportBatch

apps/sim/lib/table/import-data.ts:60–113  ·  view source on GitHub ↗
(
  data: BulkImportBatch,
  table: TableDefinition,
  requestId: string
)

Source from the content-addressed store, hash-verified

58 * the batch for crossing `max_rows`; the caller marks the import failed.
59 */
60export async function bulkInsertImportBatch(
61 data: BulkImportBatch,
62 table: TableDefinition,
63 requestId: string
64): Promise<{ inserted: number; lastOrderKey: string | null }> {
65 for (let i = 0; i < data.rows.length; i++) {
66 const sizeValidation = validateRowSize(data.rows[i])
67 if (!sizeValidation.valid) {
68 throw new Error(`Row ${i + 1}: ${sizeValidation.errors.join(', ')}`)
69 }
70 const schemaValidation = coerceRowToSchema(data.rows[i], table.schema)
71 if (!schemaValidation.valid) {
72 throw new Error(`Row ${i + 1}: ${schemaValidation.errors.join(', ')}`)
73 }
74 }
75
76 const uniqueColumns = getUniqueColumns(table.schema)
77 if (uniqueColumns.length > 0) {
78 const uniqueResult = await checkBatchUniqueConstraintsDb(
79 data.tableId,
80 data.rows,
81 table.schema,
82 db
83 )
84 if (!uniqueResult.valid) {
85 throw new Error(
86 uniqueResult.errors.map((e) => `Row ${e.row + 1}: ${e.errors.join(', ')}`).join('; ')
87 )
88 }
89 }
90
91 const now = new Date()
92 // Import worker is the table's sole writer; append keys after the anchor the caller threads
93 // from the previous batch's last key — no per-batch max(order_key) scan over a growing table.
94 const orderKeys = nKeysBetween(data.afterOrderKey ?? null, null, data.rows.length)
95 const rowsToInsert = data.rows.map((rowData, i) => ({
96 id: `row_${generateId().replace(/-/g, '')}`,
97 tableId: data.tableId,
98 workspaceId: data.workspaceId,
99 data: rowData,
100 position: data.startPosition + i,
101 orderKey: orderKeys[i],
102 createdAt: now,
103 updatedAt: now,
104 ...(data.userId ? { createdBy: data.userId } : {}),
105 }))
106
107 await db.insert(userTableRows).values(rowsToInsert)
108 logger.info(`[${requestId}] Bulk-imported ${rowsToInsert.length} rows into table ${data.tableId}`)
109 return {
110 inserted: rowsToInsert.length,
111 lastOrderKey: orderKeys[orderKeys.length - 1] ?? data.afterOrderKey ?? null,
112 }
113}
114
115/** Deletes every row of a table (set-based; the statement-level trigger zeroes `row_count`). */
116export async function deleteAllTableRows(tableId: string): Promise<void> {

Callers 1

flushFunction · 0.90

Calls 9

validateRowSizeFunction · 0.90
coerceRowToSchemaFunction · 0.90
getUniqueColumnsFunction · 0.90
nKeysBetweenFunction · 0.90
generateIdFunction · 0.90
joinMethod · 0.80
infoMethod · 0.80
replaceMethod · 0.65

Tested by

no test coverage detected