MCPcopy
hub / github.com/electric-sql/pglite / applyMessagesToTableWithJson

Function applyMessagesToTableWithJson

packages/pglite-sync/src/apply.ts:126–177  ·  view source on GitHub ↗
({
  pg,
  table,
  schema = 'public',
  messages,
  mapColumns,
  debug,
}: BulkApplyMessagesToTableOptions)

Source from the content-addressed store, hash-verified

124}
125
126export async function applyMessagesToTableWithJson({
127 pg,
128 table,
129 schema = 'public',
130 messages,
131 mapColumns,
132 debug,
133}: BulkApplyMessagesToTableOptions) {
134 if (debug) console.log('applying messages with json_to_recordset')
135
136 // Map the messages to the data to be inserted
137 const data: Record<string, object>[] = messages.map((message) =>
138 mapColumns ? doMapColumns(mapColumns, message) : message.value,
139 )
140 const columns = (
141 await pg.query<{
142 column_name: string
143 udt_name: string
144 data_type: string
145 }>(
146 `
147 SELECT column_name, udt_name, data_type
148 FROM information_schema.columns
149 WHERE table_name = $1 AND table_schema = $2
150 `,
151 [table, schema],
152 )
153 ).rows.filter((x) =>
154 Object.prototype.hasOwnProperty.call(data[0], x.column_name),
155 )
156
157 const MAX = 10_000
158 for (let i = 0; i < data.length; i += MAX) {
159 const maxdata = data.slice(i, i + MAX)
160 await pg.query(
161 `
162 INSERT INTO "${schema}"."${table}"
163 SELECT x.* from json_to_recordset($1) as x(${columns
164 .map(
165 (x) =>
166 `${x.column_name} ${x.udt_name.replace(/^_/, '')}` +
167 (x.data_type === 'ARRAY' ? `[]` : ''),
168 )
169 .join(', ')})
170 `,
171 [maxdata],
172 )
173 }
174
175 if (debug)
176 console.log(`Inserted ${messages.length} rows using json_to_recordset`)
177}
178
179export async function applyMessagesToTableWithCopy({
180 pg,

Callers

nothing calls this directly

Calls 4

doMapColumnsFunction · 0.85
joinMethod · 0.80
queryMethod · 0.65
logMethod · 0.45

Tested by

no test coverage detected