({
pg,
table,
schema = 'public',
messages,
mapColumns,
debug,
}: BulkApplyMessagesToTableOptions)
| 90 | } |
| 91 | |
| 92 | export async function applyInsertsToTable({ |
| 93 | pg, |
| 94 | table, |
| 95 | schema = 'public', |
| 96 | messages, |
| 97 | mapColumns, |
| 98 | debug, |
| 99 | }: BulkApplyMessagesToTableOptions) { |
| 100 | // Map the messages to the data to be inserted |
| 101 | const data: Record<string, object>[] = messages.map((message) => |
| 102 | mapColumns ? doMapColumns(mapColumns, message) : message.value, |
| 103 | ) |
| 104 | |
| 105 | if (debug) console.log('inserting', data) |
| 106 | |
| 107 | // Get column names from the first message |
| 108 | const columns = Object.keys(data[0]) |
| 109 | const MAX = Math.floor(32_000 / columns.length) |
| 110 | for (let i = 0; i < data.length; i += MAX) { |
| 111 | const maxdata = data.slice(i, i + MAX) // slice the data to avoid too many parameters |
| 112 | const sql = ` |
| 113 | INSERT INTO "${schema}"."${table}" |
| 114 | (${columns.map((s) => `"${s}"`).join(', ')}) |
| 115 | VALUES |
| 116 | ${maxdata.map((_, j) => `(${columns.map((_v, k) => '$' + (j * columns.length + k + 1)).join(', ')})`).join(', ')} |
| 117 | ` |
| 118 | const values = maxdata.flatMap((message) => |
| 119 | columns.map((column) => message[column]), |
| 120 | ) |
| 121 | await pg.query(sql, values) |
| 122 | } |
| 123 | if (debug) console.log(`Inserted ${messages.length} rows using INSERT`) |
| 124 | } |
| 125 | |
| 126 | export async function applyMessagesToTableWithJson({ |
| 127 | pg, |
no test coverage detected