MCPcopy
hub / github.com/simstudioai/sim / runTableImport

Function runTableImport

apps/sim/lib/table/import-runner.ts:81–385  ·  view source on GitHub ↗
(payload: TableImportPayload)

Source from the content-addressed store, hash-verified

79 * fails. Progress and the terminal state are surfaced via the table-events SSE stream.
80 */
81export async function runTableImport(payload: TableImportPayload): Promise<void> {
82 const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload
83 const requestId = generateId().slice(0, 8)
84 // Hoisted so `finally` can destroy it on any failure — otherwise the storage HTTP body leaks
85 // open until it times out.
86 let source: Readable | undefined
87
88 try {
89 const loaded = await getTableById(tableId, { includeArchived: true })
90 if (!loaded) throw new Error(`Import target table ${tableId} not found`)
91 const table = loaded
92
93 // Total byte size for the progress estimate — a cheap HEAD, no download. May be null on
94 // the local dev provider, in which case the bar stays indeterminate (rows still show).
95 const totalBytes = (await headObject(fileKey, 'workspace'))?.size ?? 0
96
97 // Stream the file rather than buffering it — a ~1M-row import must never be held in memory.
98 source = await downloadFileStream({ key: fileKey, context: 'workspace' })
99
100 // Append must continue after the existing rows; create/replace start empty. Read once up
101 // front (the import is the table's sole writer) and assign contiguous positions / threaded
102 // order keys from it.
103 const basePosition = mode === 'append' ? await nextImportStartPosition(tableId) : 0
104 let lastOrderKey = mode === 'append' ? await nextImportStartOrderKey(tableId) : null
105
106 // Append keeps the existing rows; create/replace start from empty (replace deletes
107 // existing rows in resolveSetup). Per-batch capacity is checked against this base + the
108 // running total, so a stream that crosses the plan limit fails within one batch.
109 const existingRowCount = mode === 'append' ? table.rowCount : 0
110
111 // Count bytes as they flow so the row total can be extrapolated from byte progress.
112 let bytesRead = 0
113 const byteCounter = new Transform({
114 transform(chunk: Buffer, _enc, cb) {
115 bytesRead += chunk.length
116 cb(null, chunk)
117 },
118 })
119
120 const parser = createCsvParser(delimiter)
121 // `.pipe` doesn't forward source errors; forward so the iterator throws.
122 source.on('error', (err) => parser.destroy(err))
123 byteCounter.on('error', (err) => parser.destroy(err))
124 source.pipe(byteCounter).pipe(parser)
125
126 let schema: TableSchema | null = null
127 let headerToColumn: Map<string, string> | null = null
128 let inserted = 0
129 let lastReported = 0
130 const sample: Record<string, unknown>[] = []
131 let batch: Record<string, unknown>[] = []
132
133 /**
134 * Resolve the schema + header→column mapping from the buffered sample (runs once).
135 * `create` infers a fresh schema and overwrites the placeholder; `append`/`replace`
136 * map onto the existing schema, optionally auto-creating `createColumns` first.
137 */
138 const resolveSetup = async () => {

Callers 5

dispatchImportJobFunction · 0.90
route.tsFile · 0.90
route.tsFile · 0.90
table-import.tsFile · 0.90

Calls 15

generateIdFunction · 0.90
getTableByIdFunction · 0.90
headObjectFunction · 0.90
downloadFileStreamFunction · 0.90
nextImportStartPositionFunction · 0.90
nextImportStartOrderKeyFunction · 0.90
createCsvParserFunction · 0.90
markJobFailedFunction · 0.90
appendTableEventFunction · 0.90
captureServerEventFunction · 0.90
truncateFunction · 0.90
updateJobProgressFunction · 0.90

Tested by

no test coverage detected