(rows: Record<string, unknown>[])
| 193 | } |
| 194 | |
| 195 | const flush = async (rows: Record<string, unknown>[]) => { |
| 196 | if (rows.length === 0 || !schema || !headerToColumn) return |
| 197 | // Ownership gate before every insert: once this run loses the table (cancel/supersede), |
| 198 | // updateJobProgress returns false and we stop before writing into a table a newer import |
| 199 | // may own. Runs per batch (not just at the emit cadence) so we stop within one batch. |
| 200 | const owns = await updateJobProgress(tableId, inserted, importId) |
| 201 | if (!owns) throw new ImportSupersededError() |
| 202 | const coerced = coerceRowsForTable(rows, schema, headerToColumn) |
| 203 | const rowLimit = await assertRowCapacity({ |
| 204 | workspaceId, |
| 205 | currentRowCount: existingRowCount + inserted, |
| 206 | addedRows: coerced.length, |
| 207 | }) |
| 208 | const result = await bulkInsertImportBatch( |
| 209 | { |
| 210 | tableId, |
| 211 | workspaceId, |
| 212 | userId, |
| 213 | rows: coerced, |
| 214 | startPosition: basePosition + inserted, |
| 215 | afterOrderKey: lastOrderKey, |
| 216 | }, |
| 217 | { ...table, schema }, |
| 218 | requestId |
| 219 | ) |
| 220 | notifyTableRowUsage({ |
| 221 | workspaceId, |
| 222 | currentRowCount: existingRowCount + inserted, |
| 223 | addedRows: result.inserted, |
| 224 | limit: rowLimit, |
| 225 | }) |
| 226 | inserted += result.inserted |
| 227 | lastOrderKey = result.lastOrderKey |
| 228 | // Emit after the first batch, then every interval, so the bar appears early without flooding. |
| 229 | if ( |
| 230 | inserted - lastReported >= PROGRESS_INTERVAL_ROWS || |
| 231 | (lastReported === 0 && inserted > 0) |
| 232 | ) { |
| 233 | lastReported = inserted |
| 234 | // Exact, monotonic completion from bytes consumed — no wobbly row estimate. |
| 235 | const percent = |
| 236 | totalBytes > 0 ? Math.min(99, Math.round((bytesRead / totalBytes) * 100)) : undefined |
| 237 | void appendTableEvent({ |
| 238 | kind: 'job', |
| 239 | type: 'import', |
| 240 | tableId, |
| 241 | jobId: importId, |
| 242 | status: 'running', |
| 243 | progress: inserted, |
| 244 | percent, |
| 245 | }) |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | let ready = false |
| 250 | for await (const record of parser as AsyncIterable<Record<string, unknown>>) { |
no test coverage detected