(payload: TableImportPayload)
| 79 | * fails. Progress and the terminal state are surfaced via the table-events SSE stream. |
| 80 | */ |
| 81 | export async function runTableImport(payload: TableImportPayload): Promise<void> { |
| 82 | const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload |
| 83 | const requestId = generateId().slice(0, 8) |
| 84 | // Hoisted so `finally` can destroy it on any failure — otherwise the storage HTTP body leaks |
| 85 | // open until it times out. |
| 86 | let source: Readable | undefined |
| 87 | |
| 88 | try { |
| 89 | const loaded = await getTableById(tableId, { includeArchived: true }) |
| 90 | if (!loaded) throw new Error(`Import target table ${tableId} not found`) |
| 91 | const table = loaded |
| 92 | |
| 93 | // Total byte size for the progress estimate — a cheap HEAD, no download. May be null on |
| 94 | // the local dev provider, in which case the bar stays indeterminate (rows still show). |
| 95 | const totalBytes = (await headObject(fileKey, 'workspace'))?.size ?? 0 |
| 96 | |
| 97 | // Stream the file rather than buffering it — a ~1M-row import must never be held in memory. |
| 98 | source = await downloadFileStream({ key: fileKey, context: 'workspace' }) |
| 99 | |
| 100 | // Append must continue after the existing rows; create/replace start empty. Read once up |
| 101 | // front (the import is the table's sole writer) and assign contiguous positions / threaded |
| 102 | // order keys from it. |
| 103 | const basePosition = mode === 'append' ? await nextImportStartPosition(tableId) : 0 |
| 104 | let lastOrderKey = mode === 'append' ? await nextImportStartOrderKey(tableId) : null |
| 105 | |
| 106 | // Append keeps the existing rows; create/replace start from empty (replace deletes |
| 107 | // existing rows in resolveSetup). Per-batch capacity is checked against this base + the |
| 108 | // running total, so a stream that crosses the plan limit fails within one batch. |
| 109 | const existingRowCount = mode === 'append' ? table.rowCount : 0 |
| 110 | |
| 111 | // Count bytes as they flow so the row total can be extrapolated from byte progress. |
| 112 | let bytesRead = 0 |
| 113 | const byteCounter = new Transform({ |
| 114 | transform(chunk: Buffer, _enc, cb) { |
| 115 | bytesRead += chunk.length |
| 116 | cb(null, chunk) |
| 117 | }, |
| 118 | }) |
| 119 | |
| 120 | const parser = createCsvParser(delimiter) |
| 121 | // `.pipe` doesn't forward source errors; forward so the iterator throws. |
| 122 | source.on('error', (err) => parser.destroy(err)) |
| 123 | byteCounter.on('error', (err) => parser.destroy(err)) |
| 124 | source.pipe(byteCounter).pipe(parser) |
| 125 | |
| 126 | let schema: TableSchema | null = null |
| 127 | let headerToColumn: Map<string, string> | null = null |
| 128 | let inserted = 0 |
| 129 | let lastReported = 0 |
| 130 | const sample: Record<string, unknown>[] = [] |
| 131 | let batch: Record<string, unknown>[] = [] |
| 132 | |
| 133 | /** |
| 134 | * Resolve the schema + header→column mapping from the buffered sample (runs once). |
| 135 | * `create` infers a fresh schema and overwrites the placeholder; `append`/`replace` |
| 136 | * map onto the existing schema, optionally auto-creating `createColumns` first. |
| 137 | */ |
| 138 | const resolveSetup = async () => { |
no test coverage detected