({
pg,
table,
schema = 'public',
messages,
mapColumns,
debug,
}: BulkApplyMessagesToTableOptions)
| 124 | } |
| 125 | |
| 126 | export async function applyMessagesToTableWithJson({ |
| 127 | pg, |
| 128 | table, |
| 129 | schema = 'public', |
| 130 | messages, |
| 131 | mapColumns, |
| 132 | debug, |
| 133 | }: BulkApplyMessagesToTableOptions) { |
| 134 | if (debug) console.log('applying messages with json_to_recordset') |
| 135 | |
| 136 | // Map the messages to the data to be inserted |
| 137 | const data: Record<string, object>[] = messages.map((message) => |
| 138 | mapColumns ? doMapColumns(mapColumns, message) : message.value, |
| 139 | ) |
| 140 | const columns = ( |
| 141 | await pg.query<{ |
| 142 | column_name: string |
| 143 | udt_name: string |
| 144 | data_type: string |
| 145 | }>( |
| 146 | ` |
| 147 | SELECT column_name, udt_name, data_type |
| 148 | FROM information_schema.columns |
| 149 | WHERE table_name = $1 AND table_schema = $2 |
| 150 | `, |
| 151 | [table, schema], |
| 152 | ) |
| 153 | ).rows.filter((x) => |
| 154 | Object.prototype.hasOwnProperty.call(data[0], x.column_name), |
| 155 | ) |
| 156 | |
| 157 | const MAX = 10_000 |
| 158 | for (let i = 0; i < data.length; i += MAX) { |
| 159 | const maxdata = data.slice(i, i + MAX) |
| 160 | await pg.query( |
| 161 | ` |
| 162 | INSERT INTO "${schema}"."${table}" |
| 163 | SELECT x.* from json_to_recordset($1) as x(${columns |
| 164 | .map( |
| 165 | (x) => |
| 166 | `${x.column_name} ${x.udt_name.replace(/^_/, '')}` + |
| 167 | (x.data_type === 'ARRAY' ? `[]` : ''), |
| 168 | ) |
| 169 | .join(', ')}) |
| 170 | `, |
| 171 | [maxdata], |
| 172 | ) |
| 173 | } |
| 174 | |
| 175 | if (debug) |
| 176 | console.log(`Inserted ${messages.length} rows using json_to_recordset`) |
| 177 | } |
| 178 | |
| 179 | export async function applyMessagesToTableWithCopy({ |
| 180 | pg, |
nothing calls this directly
no test coverage detected