(
options: SyncQueryOptions,
response: Writable,
checkpointMap: CheckpointMap,
sessionId: string,
)
| 757 | } |
| 758 | |
| 759 | private async syncPartnerStackV1( |
| 760 | options: SyncQueryOptions, |
| 761 | response: Writable, |
| 762 | checkpointMap: CheckpointMap, |
| 763 | sessionId: string, |
| 764 | ) { |
| 765 | const deleteType = SyncEntityType.PartnerStackDeleteV1; |
| 766 | const deletes = this.syncRepository.partnerStack.getDeletes({ ...options, ack: checkpointMap[deleteType] }); |
| 767 | for await (const { id, ...data } of deletes) { |
| 768 | send(response, { type: deleteType, ids: [id], data }); |
| 769 | } |
| 770 | |
| 771 | const backfillType = SyncEntityType.PartnerStackBackfillV1; |
| 772 | const backfillCheckpoint = checkpointMap[backfillType]; |
| 773 | const partners = await this.syncRepository.partner.getCreatedAfter({ |
| 774 | ...options, |
| 775 | afterCreateId: backfillCheckpoint?.updateId, |
| 776 | }); |
| 777 | const upsertType = SyncEntityType.PartnerStackV1; |
| 778 | const upsertCheckpoint = checkpointMap[upsertType]; |
| 779 | if (upsertCheckpoint) { |
| 780 | const endId = upsertCheckpoint.updateId; |
| 781 | |
| 782 | for (const partner of partners) { |
| 783 | const createId = partner.createId; |
| 784 | if (isEntityBackfillComplete(createId, backfillCheckpoint)) { |
| 785 | continue; |
| 786 | } |
| 787 | |
| 788 | const startId = getStartId(createId, backfillCheckpoint); |
| 789 | const backfill = this.syncRepository.partnerStack.getBackfill( |
| 790 | { ...options, afterUpdateId: startId, beforeUpdateId: endId }, |
| 791 | partner.sharedById, |
| 792 | ); |
| 793 | |
| 794 | for await (const { updateId, ...data } of backfill) { |
| 795 | send(response, { |
| 796 | type: backfillType, |
| 797 | ids: [createId, updateId], |
| 798 | data, |
| 799 | }); |
| 800 | } |
| 801 | |
| 802 | sendEntityBackfillCompleteAck(response, backfillType, createId); |
| 803 | } |
| 804 | } else if (partners.length > 0) { |
| 805 | await this.upsertBackfillCheckpoint({ |
| 806 | type: backfillType, |
| 807 | sessionId, |
| 808 | createId: partners.at(-1)!.createId, |
| 809 | }); |
| 810 | } |
| 811 | |
| 812 | const upserts = this.syncRepository.partnerStack.getUpserts({ ...options, ack: checkpointMap[upsertType] }); |
| 813 | for await (const { updateId, ...data } of upserts) { |
| 814 | send(response, { type: upsertType, ids: [updateId], data }); |
| 815 | } |
| 816 | } |
no test coverage detected