(
options: SyncQueryOptions,
response: Writable,
checkpointMap: CheckpointMap,
sessionId: string,
)
| 295 | } |
| 296 | |
| 297 | private async syncPartnerAssetsV2( |
| 298 | options: SyncQueryOptions, |
| 299 | response: Writable, |
| 300 | checkpointMap: CheckpointMap, |
| 301 | sessionId: string, |
| 302 | ) { |
| 303 | const deleteType = SyncEntityType.PartnerAssetDeleteV1; |
| 304 | const deletes = this.syncRepository.partnerAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] }); |
| 305 | for await (const { id, ...data } of deletes) { |
| 306 | send(response, { type: deleteType, ids: [id], data }); |
| 307 | } |
| 308 | |
| 309 | const backfillType = SyncEntityType.PartnerAssetBackfillV2; |
| 310 | const backfillCheckpoint = checkpointMap[backfillType]; |
| 311 | const partners = await this.syncRepository.partner.getCreatedAfter({ |
| 312 | ...options, |
| 313 | afterCreateId: backfillCheckpoint?.updateId, |
| 314 | }); |
| 315 | const upsertType = SyncEntityType.PartnerAssetV2; |
| 316 | const upsertCheckpoint = checkpointMap[upsertType]; |
| 317 | if (upsertCheckpoint) { |
| 318 | const endId = upsertCheckpoint.updateId; |
| 319 | |
| 320 | for (const partner of partners) { |
| 321 | const createId = partner.createId; |
| 322 | if (isEntityBackfillComplete(createId, backfillCheckpoint)) { |
| 323 | continue; |
| 324 | } |
| 325 | |
| 326 | const startId = getStartId(createId, backfillCheckpoint); |
| 327 | const backfill = this.syncRepository.partnerAsset.getBackfill( |
| 328 | { ...options, afterUpdateId: startId, beforeUpdateId: endId }, |
| 329 | partner.sharedById, |
| 330 | ); |
| 331 | |
| 332 | for await (const { updateId, ...data } of backfill) { |
| 333 | send(response, { |
| 334 | type: backfillType, |
| 335 | ids: [createId, updateId], |
| 336 | data: mapSyncAssetV2(data), |
| 337 | }); |
| 338 | } |
| 339 | |
| 340 | sendEntityBackfillCompleteAck(response, backfillType, createId); |
| 341 | } |
| 342 | } else if (partners.length > 0) { |
| 343 | await this.upsertBackfillCheckpoint({ |
| 344 | type: backfillType, |
| 345 | sessionId, |
| 346 | createId: partners.at(-1)!.createId, |
| 347 | }); |
| 348 | } |
| 349 | |
| 350 | const upserts = this.syncRepository.partnerAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] }); |
| 351 | for await (const { updateId, ...data } of upserts) { |
| 352 | send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV2(data) }); |
| 353 | } |
| 354 | } |
no test coverage detected