MCPcopy Index your code
hub / github.com/simstudioai/sim / dispatchDueDrains

Function dispatchDueDrains

apps/sim/lib/data-drains/dispatcher.ts:64–186  ·  view source on GitHub ↗
(now: Date = new Date())

Source from the content-addressed store, hash-verified

62 * belonging to orgs that have lapsed off the enterprise plan are skipped.
63 */
64export async function dispatchDueDrains(now: Date = new Date()): Promise<{
65 candidates: number
66 dispatched: number
67 skipped: number
68 reaped: number
69}> {
70 const { reaped } = await reapOrphanedRuns(now)
71
72 const hourlyCutoff = new Date(now.getTime() - HOUR_MS + CADENCE_BUFFER_MS)
73 const dailyCutoff = new Date(now.getTime() - DAY_MS + CADENCE_BUFFER_MS)
74
75 const duePredicate = and(
76 eq(dataDrains.enabled, true),
77 or(
78 isNull(dataDrains.lastRunAt),
79 and(eq(dataDrains.scheduleCadence, 'hourly'), lt(dataDrains.lastRunAt, hourlyCutoff)),
80 and(eq(dataDrains.scheduleCadence, 'daily'), lt(dataDrains.lastRunAt, dailyCutoff))
81 )
82 )
83
84 const candidates = await db
85 .select({
86 id: dataDrains.id,
87 organizationId: dataDrains.organizationId,
88 lastRunAt: dataDrains.lastRunAt,
89 })
90 .from(dataDrains)
91 .where(duePredicate)
92
93 if (candidates.length === 0) {
94 return { candidates: 0, dispatched: 0, skipped: 0, reaped }
95 }
96
97 // Self-hosted deployments have no subscription infra; `DATA_DRAINS_ENABLED`
98 // is the global on/off there. Cache per-org so a multi-drain org pays one
99 // billing lookup.
100 const enterpriseCache = new Map<string, boolean>()
101 const isEnterprise = async (orgId: string): Promise<boolean> => {
102 if (!isBillingEnabled) return true
103 const cached = enterpriseCache.get(orgId)
104 if (cached !== undefined) return cached
105 const result = await isOrganizationOnEnterprisePlan(orgId)
106 enterpriseCache.set(orgId, result)
107 return result
108 }
109
110 const queue = await getJobQueue()
111 let dispatched = 0
112 let skipped = 0
113
114 for (const candidate of candidates) {
115 let enterprise: boolean
116 try {
117 enterprise = await isEnterprise(candidate.organizationId)
118 } catch (error) {
119 // A billing-API failure for one org must not abort the whole batch —
120 // skip this drain and let the next cron tick retry it.
121 logger.warn('Enterprise check failed; skipping drain', {

Callers 2

dispatcher.test.tsFile · 0.90
route.tsFile · 0.90

Calls 10

getJobQueueFunction · 0.90
toErrorFunction · 0.90
reapOrphanedRunsFunction · 0.85
errorMethod · 0.80
infoMethod · 0.80
isEnterpriseFunction · 0.70
warnMethod · 0.65
setMethod · 0.65
enqueueMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected