| 45 | } |
| 46 | |
| 47 | private parseStream(inputStream: NodeJS.ReadableStream): Promise<FileParseResult> { |
| 48 | return new Promise((resolve, reject) => { |
| 49 | let rowCount = 0 |
| 50 | let errorCount = 0 |
| 51 | let headers: string[] = [] |
| 52 | let processedContent = '' |
| 53 | const sampledRows: any[] = [] |
| 54 | const errors: string[] = [] |
| 55 | let firstRowProcessed = false |
| 56 | let aborted = false |
| 57 | |
| 58 | const parserOptions: Options = { |
| 59 | columns: true, // Use first row as headers |
| 60 | skip_empty_lines: true, // Skip empty lines |
| 61 | trim: true, // Trim whitespace |
| 62 | relax_column_count: true, // Allow variable column counts |
| 63 | relax_quotes: true, // Be lenient with quotes |
| 64 | skip_records_with_error: true, // Skip bad records |
| 65 | raw: false, |
| 66 | cast: false, |
| 67 | } |
| 68 | const parser = parse(parserOptions) |
| 69 | |
| 70 | parser.on('readable', () => { |
| 71 | let record |
| 72 | while ((record = parser.read()) !== null && !aborted) { |
| 73 | rowCount++ |
| 74 | |
| 75 | if (!firstRowProcessed && record) { |
| 76 | headers = Object.keys(record).map((h) => sanitizeTextForUTF8(String(h))) |
| 77 | processedContent = `${headers.join(', ')}\n` |
| 78 | firstRowProcessed = true |
| 79 | } |
| 80 | |
| 81 | if (rowCount <= CONFIG.MAX_PREVIEW_ROWS) { |
| 82 | try { |
| 83 | const cleanValues = Object.values(record).map((v: any) => |
| 84 | sanitizeTextForUTF8(String(v || '')) |
| 85 | ) |
| 86 | processedContent += `${cleanValues.join(', ')}\n` |
| 87 | |
| 88 | if (rowCount <= CONFIG.MAX_SAMPLE_ROWS) { |
| 89 | sampledRows.push(record) |
| 90 | } |
| 91 | } catch (err) { |
| 92 | logger.warn(`Error processing row ${rowCount}:`, err) |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | if (rowCount % 10000 === 0) { |
| 97 | logger.info(`Processed ${rowCount} rows...`) |
| 98 | } |
| 99 | } |
| 100 | }) |
| 101 | |
| 102 | parser.on('skip', (err: any) => { |
| 103 | errorCount++ |
| 104 | |