MCPcopy Index your code
hub / github.com/simstudioai/sim / processRecords

Function processRecords

apps/sim/lib/webhooks/polling/hubspot.ts:731–863  ·  view source on GitHub ↗
(
  records: HubSpotSearchResult[],
  webhookData: PollWebhookContext['webhookData'],
  workflowData: PollWebhookContext['workflowData'],
  objectType: string,
  eventType: HubSpotEventType,
  filterProperty: string,
  targetProperty: string | undefined,
  snapshot: PropertySnapshotState | null,
  requestId: string,
  logger: Logger
)

Source from the content-addressed store, hash-verified

729}
730
731async function processRecords(
732 records: HubSpotSearchResult[],
733 webhookData: PollWebhookContext['webhookData'],
734 workflowData: PollWebhookContext['workflowData'],
735 objectType: string,
736 eventType: HubSpotEventType,
737 filterProperty: string,
738 targetProperty: string | undefined,
739 snapshot: PropertySnapshotState | null,
740 requestId: string,
741 logger: Logger
742): Promise<{
743 processedCount: number
744 failedCount: number
745 skippedCount: number
746 highestSeenMs: number
747 maxIdAtHighestTimestamp: string
748}> {
749 let processedCount = 0
750 let failedCount = 0
751 let skippedCount = 0
752 let highestSeenMs = 0
753 let maxIdAtHighestTimestamp = ''
754 // Stop advancing the cursor at the first failure so that the failed record and all later
755 // records (sorted ASC) get re-fetched on the next poll. Without this gate, a transient
756 // failure on a record at a high timestamp would advance the cursor past it permanently.
757 let cursorFrozen = false
758
759 for (const record of records) {
760 const occurredAtMs = extractPropertyTimestampMs(record, filterProperty)
761
762 let previousValue: string | null | undefined
763 let propertyValue: string | null | undefined
764 let handledBySkip = false
765 if (eventType === 'property_changed' && targetProperty && snapshot) {
766 propertyValue = record.properties?.[targetProperty] ?? null
767 const had = snapshot.values.has(record.id)
768 previousValue = had ? snapshot.values.get(record.id) : undefined
769 if (had && (previousValue ?? null) === (propertyValue ?? null)) {
770 // Touch the snapshot so this record's entry moves to the end of the LRU order.
771 // Map.delete + Map.set re-inserts at the tail, regardless of key type.
772 snapshot.values.delete(record.id)
773 snapshot.values.set(record.id, propertyValue ?? null)
774 skippedCount++
775 handledBySkip = true
776 }
777 // Note: we do NOT pre-update the snapshot before processing. If emission fails the
778 // record must re-fetch on the next poll AND still appear as a change vs. the prior
779 // snapshot — otherwise we'd silently skip it on retry.
780 }
781
782 let handledSuccessfully = handledBySkip
783 if (!handledBySkip) {
784 try {
785 await pollingIdempotency.executeWithIdempotency(
786 'hubspot',
787 `${webhookData.id}:${objectType}:${eventType}:${record.id}:${Number.isFinite(occurredAtMs) ? occurredAtMs : record.updatedAt}`,
788 async () => {

Callers 1

pollSearchBasedFunction · 0.85

Calls 9

getErrorMessageFunction · 0.90
compareObjectIdsFunction · 0.85
errorMethod · 0.80
getMethod · 0.65
deleteMethod · 0.65
setMethod · 0.65

Tested by

no test coverage detected