({
pg,
table,
schema = 'public',
messages,
mapColumns,
debug,
}: BulkApplyMessagesToTableOptions)
| 177 | } |
| 178 | |
| 179 | export async function applyMessagesToTableWithCopy({ |
| 180 | pg, |
| 181 | table, |
| 182 | schema = 'public', |
| 183 | messages, |
| 184 | mapColumns, |
| 185 | debug, |
| 186 | }: BulkApplyMessagesToTableOptions) { |
| 187 | if (debug) console.log('applying messages with COPY') |
| 188 | |
| 189 | // Map the messages to the data to be inserted |
| 190 | const data: Record<string, any>[] = messages.map((message) => |
| 191 | mapColumns ? doMapColumns(mapColumns, message) : message.value, |
| 192 | ) |
| 193 | |
| 194 | // Get column names from the first message |
| 195 | const columns = Object.keys(data[0]) |
| 196 | |
| 197 | // Create CSV data |
| 198 | const csvData = data |
| 199 | .map((message) => { |
| 200 | return columns |
| 201 | .map((column) => { |
| 202 | const value = message[column] |
| 203 | // Escape double quotes and wrap in quotes if necessary |
| 204 | if ( |
| 205 | typeof value === 'string' && |
| 206 | (value.includes(',') || value.includes('"') || value.includes('\n')) |
| 207 | ) { |
| 208 | return `"${value.replace(/"/g, '""')}"` |
| 209 | } |
| 210 | return value === null ? '\\N' : value |
| 211 | }) |
| 212 | .join(',') |
| 213 | }) |
| 214 | .join('\n') |
| 215 | const csvBlob = new Blob([csvData], { type: 'text/csv' }) |
| 216 | |
| 217 | // Perform COPY FROM |
| 218 | await pg.query( |
| 219 | ` |
| 220 | COPY "${schema}"."${table}" (${columns.map((c) => `"${c}"`).join(', ')}) |
| 221 | FROM '/dev/blob' |
| 222 | WITH (FORMAT csv, NULL '\\N') |
| 223 | `, |
| 224 | [], |
| 225 | { |
| 226 | blob: csvBlob, |
| 227 | }, |
| 228 | ) |
| 229 | |
| 230 | if (debug) console.log(`Inserted ${messages.length} rows using COPY`) |
| 231 | } |
| 232 | |
| 233 | function doMapColumns( |
| 234 | mapColumns: MapColumns, |
nothing calls this directly
no test coverage detected