( records: HubSpotSearchResult[], webhookData: PollWebhookContext['webhookData'], workflowData: PollWebhookContext['workflowData'], objectType: string, eventType: HubSpotEventType, filterProperty: string, targetProperty: string | undefined, snapshot: PropertySnapshotState | null, requestId: string, logger: Logger )
| 729 | } |
| 730 | |
| 731 | async function processRecords( |
| 732 | records: HubSpotSearchResult[], |
| 733 | webhookData: PollWebhookContext['webhookData'], |
| 734 | workflowData: PollWebhookContext['workflowData'], |
| 735 | objectType: string, |
| 736 | eventType: HubSpotEventType, |
| 737 | filterProperty: string, |
| 738 | targetProperty: string | undefined, |
| 739 | snapshot: PropertySnapshotState | null, |
| 740 | requestId: string, |
| 741 | logger: Logger |
| 742 | ): Promise<{ |
| 743 | processedCount: number |
| 744 | failedCount: number |
| 745 | skippedCount: number |
| 746 | highestSeenMs: number |
| 747 | maxIdAtHighestTimestamp: string |
| 748 | }> { |
| 749 | let processedCount = 0 |
| 750 | let failedCount = 0 |
| 751 | let skippedCount = 0 |
| 752 | let highestSeenMs = 0 |
| 753 | let maxIdAtHighestTimestamp = '' |
| 754 | // Stop advancing the cursor at the first failure so that the failed record and all later |
| 755 | // records (sorted ASC) get re-fetched on the next poll. Without this gate, a transient |
| 756 | // failure on a record at a high timestamp would advance the cursor past it permanently. |
| 757 | let cursorFrozen = false |
| 758 | |
| 759 | for (const record of records) { |
| 760 | const occurredAtMs = extractPropertyTimestampMs(record, filterProperty) |
| 761 | |
| 762 | let previousValue: string | null | undefined |
| 763 | let propertyValue: string | null | undefined |
| 764 | let handledBySkip = false |
| 765 | if (eventType === 'property_changed' && targetProperty && snapshot) { |
| 766 | propertyValue = record.properties?.[targetProperty] ?? null |
| 767 | const had = snapshot.values.has(record.id) |
| 768 | previousValue = had ? snapshot.values.get(record.id) : undefined |
| 769 | if (had && (previousValue ?? null) === (propertyValue ?? null)) { |
| 770 | // Touch the snapshot so this record's entry moves to the end of the LRU order. |
| 771 | // Map.delete + Map.set re-inserts at the tail, regardless of key type. |
| 772 | snapshot.values.delete(record.id) |
| 773 | snapshot.values.set(record.id, propertyValue ?? null) |
| 774 | skippedCount++ |
| 775 | handledBySkip = true |
| 776 | } |
| 777 | // Note: we do NOT pre-update the snapshot before processing. If emission fails the |
| 778 | // record must re-fetch on the next poll AND still appear as a change vs. the prior |
| 779 | // snapshot — otherwise we'd silently skip it on retry. |
| 780 | } |
| 781 | |
| 782 | let handledSuccessfully = handledBySkip |
| 783 | if (!handledBySkip) { |
| 784 | try { |
| 785 | await pollingIdempotency.executeWithIdempotency( |
| 786 | 'hubspot', |
| 787 | `${webhookData.id}:${objectType}:${eventType}:${record.id}:${Number.isFinite(occurredAtMs) ? occurredAtMs : record.updatedAt}`, |
| 788 | async () => { |
no test coverage detected