MCPcopy
hub / github.com/electric-sql/pglite / applyMessageToTable

Function applyMessageToTable

packages/pglite-sync/src/apply.ts:15–81  ·  view source on GitHub ↗
({
  pg,
  table,
  schema = 'public',
  message,
  mapColumns,
  primaryKey,
  debug,
}: ApplyMessageToTableOptions)

Source from the content-addressed store, hash-verified

13}
14
15export async function applyMessageToTable({
16 pg,
17 table,
18 schema = 'public',
19 message,
20 mapColumns,
21 primaryKey,
22 debug,
23}: ApplyMessageToTableOptions) {
24 const data = mapColumns ? doMapColumns(mapColumns, message) : message.value
25
26 switch (message.headers.operation) {
27 case 'insert': {
28 if (debug) console.log('inserting', data)
29 const columns = Object.keys(data)
30 return await pg.query(
31 `
32 INSERT INTO "${schema}"."${table}"
33 (${columns.map((s) => '"' + s + '"').join(', ')})
34 VALUES
35 (${columns.map((_v, i) => '$' + (i + 1)).join(', ')})
36 `,
37 columns.map((column) => data[column]),
38 )
39 }
40
41 case 'update': {
42 if (debug) console.log('updating', data)
43 const columns = Object.keys(data).filter(
44 // we don't update the primary key, they are used to identify the row
45 (column) => !primaryKey.includes(column),
46 )
47 if (columns.length === 0) return // nothing to update
48 return await pg.query(
49 `
50 UPDATE "${schema}"."${table}"
51 SET ${columns
52 .map((column, i) => '"' + column + '" = $' + (i + 1))
53 .join(', ')}
54 WHERE ${primaryKey
55 .map(
56 (column, i) =>
57 '"' + column + '" = $' + (columns.length + i + 1),
58 )
59 .join(' AND ')}
60 `,
61 [
62 ...columns.map((column) => data[column]),
63 ...primaryKey.map((column) => data[column]),
64 ],
65 )
66 }
67
68 case 'delete': {
69 if (debug) console.log('deleting', data)
70 return await pg.query(
71 `
72 DELETE FROM "${schema}"."${table}"

Callers 1

commitUpToLsnFunction · 0.90

Calls 4

doMapColumnsFunction · 0.85
joinMethod · 0.80
queryMethod · 0.65
logMethod · 0.45

Tested by

no test coverage detected