(targetLsn: Lsn)
| 179 | } as const |
| 180 | |
| 181 | const commitUpToLsn = async (targetLsn: Lsn) => { |
| 182 | // We need to collect all the messages for each shape that we need to commit |
| 183 | const messagesToCommit = new Map<string, ChangeMessage<Row<unknown>>[]>( |
| 184 | Object.keys(shapes).map((shapeName) => [shapeName, []]), |
| 185 | ) |
| 186 | for (const [shapeName, shapeChanges] of changes.entries()) { |
| 187 | const messagesForShape = messagesToCommit.get(shapeName)! |
| 188 | for (const lsn of shapeChanges.keys()) { |
| 189 | if (lsn <= targetLsn) { |
| 190 | for (const message of shapeChanges.get(lsn)!) { |
| 191 | messagesForShape.push(message) |
| 192 | } |
| 193 | shapeChanges.delete(lsn) |
| 194 | } |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | await pg.transaction(async (tx) => { |
| 199 | if (debug) { |
| 200 | console.time('commit') |
| 201 | } |
| 202 | |
| 203 | // Set the syncing flag to true during this transaction so that |
| 204 | // user defined triggers on the table are able to chose how to run |
| 205 | // during a sync |
| 206 | await tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`) |
| 207 | |
| 208 | for (const [shapeName, initialMessages] of messagesToCommit.entries()) { |
| 209 | const shape = shapes[shapeName] |
| 210 | let messages = initialMessages |
| 211 | |
| 212 | // If we need to truncate the table, do so |
| 213 | if (truncateNeeded.has(shapeName)) { |
| 214 | if (debug) { |
| 215 | console.log('truncating table', shape.table) |
| 216 | } |
| 217 | if (shape.onMustRefetch) { |
| 218 | await shape.onMustRefetch(tx) |
| 219 | } else { |
| 220 | const schema = shape.schema || 'public' |
| 221 | await tx.exec(`DELETE FROM "${schema}"."${shape.table}";`) |
| 222 | } |
| 223 | truncateNeeded.delete(shapeName) |
| 224 | } |
| 225 | |
| 226 | // Apply the changes to the table |
| 227 | if (!useInsert) { |
| 228 | // We can do a `COPY FROM`/json_to_recordset to insert the initial data |
| 229 | // Split messageAggregator into initial inserts and remaining messages |
| 230 | const initialInserts: InsertChangeMessage[] = [] |
| 231 | const remainingMessages: ChangeMessage<any>[] = [] |
| 232 | let foundNonInsert = false |
| 233 | for (const message of messages) { |
| 234 | if (!foundNonInsert && message.headers.operation === 'insert') { |
| 235 | initialInserts.push(message as InsertChangeMessage) |
| 236 | } else { |
| 237 | foundNonInsert = true |
| 238 | remainingMessages.push(message) |
no test coverage detected
searching dependent graphs…