MCPcopy
hub / github.com/electric-sql/pglite / commitUpToLsn

Function commitUpToLsn

packages/pglite-sync/src/index.ts:181–335  ·  view source on GitHub ↗
(targetLsn: Lsn)

Source from the content-addressed store, hash-verified

179 } as const
180
181 const commitUpToLsn = async (targetLsn: Lsn) => {
182 // We need to collect all the messages for each shape that we need to commit
183 const messagesToCommit = new Map<string, ChangeMessage<Row<unknown>>[]>(
184 Object.keys(shapes).map((shapeName) => [shapeName, []]),
185 )
186 for (const [shapeName, shapeChanges] of changes.entries()) {
187 const messagesForShape = messagesToCommit.get(shapeName)!
188 for (const lsn of shapeChanges.keys()) {
189 if (lsn <= targetLsn) {
190 for (const message of shapeChanges.get(lsn)!) {
191 messagesForShape.push(message)
192 }
193 shapeChanges.delete(lsn)
194 }
195 }
196 }
197
198 await pg.transaction(async (tx) => {
199 if (debug) {
200 console.time('commit')
201 }
202
203 // Set the syncing flag to true during this transaction so that
204 // user defined triggers on the table are able to chose how to run
205 // during a sync
206 await tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`)
207
208 for (const [shapeName, initialMessages] of messagesToCommit.entries()) {
209 const shape = shapes[shapeName]
210 let messages = initialMessages
211
212 // If we need to truncate the table, do so
213 if (truncateNeeded.has(shapeName)) {
214 if (debug) {
215 console.log('truncating table', shape.table)
216 }
217 if (shape.onMustRefetch) {
218 await shape.onMustRefetch(tx)
219 } else {
220 const schema = shape.schema || 'public'
221 await tx.exec(`DELETE FROM "${schema}"."${shape.table}";`)
222 }
223 truncateNeeded.delete(shapeName)
224 }
225
226 // Apply the changes to the table
227 if (!useInsert) {
228 // We can do a `COPY FROM`/json_to_recordset to insert the initial data
229 // Split messageAggregator into initial inserts and remaining messages
230 const initialInserts: InsertChangeMessage[] = []
231 const remainingMessages: ChangeMessage<any>[] = []
232 let foundNonInsert = false
233 for (const message of messages) {
234 if (!foundNonInsert && message.headers.operation === 'insert') {
235 initialInserts.push(message as InsertChangeMessage)
236 } else {
237 foundNonInsert = true
238 remainingMessages.push(message)

Callers 1

syncShapesToTablesFunction · 0.85

Calls 7

applyInsertsToTableFunction · 0.90
applyMessageToTableFunction · 0.90
updateSubscriptionStateFunction · 0.90
onInitialSyncFunction · 0.85
execMethod · 0.80
rollbackMethod · 0.80
logMethod · 0.45

Tested by

no test coverage detected