({
pg,
table,
schema = 'public',
message,
mapColumns,
primaryKey,
debug,
}: ApplyMessageToTableOptions)
| 13 | } |
| 14 | |
| 15 | export async function applyMessageToTable({ |
| 16 | pg, |
| 17 | table, |
| 18 | schema = 'public', |
| 19 | message, |
| 20 | mapColumns, |
| 21 | primaryKey, |
| 22 | debug, |
| 23 | }: ApplyMessageToTableOptions) { |
| 24 | const data = mapColumns ? doMapColumns(mapColumns, message) : message.value |
| 25 | |
| 26 | switch (message.headers.operation) { |
| 27 | case 'insert': { |
| 28 | if (debug) console.log('inserting', data) |
| 29 | const columns = Object.keys(data) |
| 30 | return await pg.query( |
| 31 | ` |
| 32 | INSERT INTO "${schema}"."${table}" |
| 33 | (${columns.map((s) => '"' + s + '"').join(', ')}) |
| 34 | VALUES |
| 35 | (${columns.map((_v, i) => '$' + (i + 1)).join(', ')}) |
| 36 | `, |
| 37 | columns.map((column) => data[column]), |
| 38 | ) |
| 39 | } |
| 40 | |
| 41 | case 'update': { |
| 42 | if (debug) console.log('updating', data) |
| 43 | const columns = Object.keys(data).filter( |
| 44 | // we don't update the primary key, they are used to identify the row |
| 45 | (column) => !primaryKey.includes(column), |
| 46 | ) |
| 47 | if (columns.length === 0) return // nothing to update |
| 48 | return await pg.query( |
| 49 | ` |
| 50 | UPDATE "${schema}"."${table}" |
| 51 | SET ${columns |
| 52 | .map((column, i) => '"' + column + '" = $' + (i + 1)) |
| 53 | .join(', ')} |
| 54 | WHERE ${primaryKey |
| 55 | .map( |
| 56 | (column, i) => |
| 57 | '"' + column + '" = $' + (columns.length + i + 1), |
| 58 | ) |
| 59 | .join(' AND ')} |
| 60 | `, |
| 61 | [ |
| 62 | ...columns.map((column) => data[column]), |
| 63 | ...primaryKey.map((column) => data[column]), |
| 64 | ], |
| 65 | ) |
| 66 | } |
| 67 | |
| 68 | case 'delete': { |
| 69 | if (debug) console.log('deleting', data) |
| 70 | return await pg.query( |
| 71 | ` |
| 72 | DELETE FROM "${schema}"."${table}" |
no test coverage detected