(payload: TableExportPayload)
| 49 | * scratch and overwrites nothing (fresh key per attempt; failures clean up their partial upload). |
| 50 | */ |
| 51 | export async function runTableExport(payload: TableExportPayload): Promise<void> { |
| 52 | const { jobId, tableId, workspaceId, format } = payload |
| 53 | const requestId = generateId().slice(0, 8) |
| 54 | let handle: MultipartUploadHandle | null = null |
| 55 | let uploadedKey: string | null = null |
| 56 | |
| 57 | try { |
| 58 | const table = await getTableById(tableId, { includeArchived: true }) |
| 59 | if (!table) throw new Error(`Export target table ${tableId} not found`) |
| 60 | |
| 61 | const columns = table.schema.columns |
| 62 | // Stored row data is id-keyed; CSV headers and JSON keys are display names, so translate |
| 63 | // id → name on the way out (export is a name-friendly boundary). |
| 64 | const nameById = buildNameById(table.schema) |
| 65 | |
| 66 | const fileName = `${sanitizeExportFilename(table.name)}.${format}` |
| 67 | // The key is pinned up front so the streaming upload writes exactly where the download |
| 68 | // route presigns; the *returned* key (from `complete`) is recorded as the source of truth. |
| 69 | const key = `workspace/${workspaceId}/exports/${tableId}/${jobId}/${fileName}` |
| 70 | const contentType = format === 'csv' ? 'text/csv; charset=utf-8' : 'application/json' |
| 71 | |
| 72 | // Stream the serialized file straight into storage in bounded parts instead of buffering the |
| 73 | // whole thing in heap — a 1M-row export no longer holds hundreds of MB resident. |
| 74 | handle = await createMultipartUpload({ key, context: 'workspace', contentType }) |
| 75 | await handle.write( |
| 76 | format === 'csv' ? `${toCsvRow(columns.map((c) => neutralizeCsvFormula(c.name)))}\n` : '[' |
| 77 | ) |
| 78 | |
| 79 | let exported = 0 |
| 80 | let firstJsonRow = true |
| 81 | let after: { orderKey: string; id: string } | null = null |
| 82 | while (true) { |
| 83 | // Ownership gate before every page: a canceled job stops within one batch. |
| 84 | const owns = await updateJobProgress(tableId, exported, jobId) |
| 85 | if (!owns) throw new JobSupersededError() |
| 86 | |
| 87 | const page = await selectExportRowPage(table, after, EXPORT_BATCH_SIZE) |
| 88 | if (page.length === 0) break |
| 89 | |
| 90 | const pageChunks: string[] = [] |
| 91 | for (const row of page) { |
| 92 | if (format === 'csv') { |
| 93 | pageChunks.push( |
| 94 | `${toCsvRow(columns.map((c) => formatCsvValue(row.data[getColumnId(c)])))}\n` |
| 95 | ) |
| 96 | } else { |
| 97 | const prefix = firstJsonRow ? '' : ',' |
| 98 | firstJsonRow = false |
| 99 | pageChunks.push(prefix + JSON.stringify(rowDataIdToName(row.data, nameById))) |
| 100 | } |
| 101 | } |
| 102 | await handle.write(pageChunks.join('')) |
| 103 | |
| 104 | exported += page.length |
| 105 | const last = page[page.length - 1] |
| 106 | after = { orderKey: last.orderKey, id: last.id } |
| 107 | if (page.length < EXPORT_BATCH_SIZE) break |
| 108 | } |
no test coverage detected