(now: Date = new Date())
| 62 | * belonging to orgs that have lapsed off the enterprise plan are skipped. |
| 63 | */ |
| 64 | export async function dispatchDueDrains(now: Date = new Date()): Promise<{ |
| 65 | candidates: number |
| 66 | dispatched: number |
| 67 | skipped: number |
| 68 | reaped: number |
| 69 | }> { |
| 70 | const { reaped } = await reapOrphanedRuns(now) |
| 71 | |
| 72 | const hourlyCutoff = new Date(now.getTime() - HOUR_MS + CADENCE_BUFFER_MS) |
| 73 | const dailyCutoff = new Date(now.getTime() - DAY_MS + CADENCE_BUFFER_MS) |
| 74 | |
| 75 | const duePredicate = and( |
| 76 | eq(dataDrains.enabled, true), |
| 77 | or( |
| 78 | isNull(dataDrains.lastRunAt), |
| 79 | and(eq(dataDrains.scheduleCadence, 'hourly'), lt(dataDrains.lastRunAt, hourlyCutoff)), |
| 80 | and(eq(dataDrains.scheduleCadence, 'daily'), lt(dataDrains.lastRunAt, dailyCutoff)) |
| 81 | ) |
| 82 | ) |
| 83 | |
| 84 | const candidates = await db |
| 85 | .select({ |
| 86 | id: dataDrains.id, |
| 87 | organizationId: dataDrains.organizationId, |
| 88 | lastRunAt: dataDrains.lastRunAt, |
| 89 | }) |
| 90 | .from(dataDrains) |
| 91 | .where(duePredicate) |
| 92 | |
| 93 | if (candidates.length === 0) { |
| 94 | return { candidates: 0, dispatched: 0, skipped: 0, reaped } |
| 95 | } |
| 96 | |
| 97 | // Self-hosted deployments have no subscription infra; `DATA_DRAINS_ENABLED` |
| 98 | // is the global on/off there. Cache per-org so a multi-drain org pays one |
| 99 | // billing lookup. |
| 100 | const enterpriseCache = new Map<string, boolean>() |
| 101 | const isEnterprise = async (orgId: string): Promise<boolean> => { |
| 102 | if (!isBillingEnabled) return true |
| 103 | const cached = enterpriseCache.get(orgId) |
| 104 | if (cached !== undefined) return cached |
| 105 | const result = await isOrganizationOnEnterprisePlan(orgId) |
| 106 | enterpriseCache.set(orgId, result) |
| 107 | return result |
| 108 | } |
| 109 | |
| 110 | const queue = await getJobQueue() |
| 111 | let dispatched = 0 |
| 112 | let skipped = 0 |
| 113 | |
| 114 | for (const candidate of candidates) { |
| 115 | let enterprise: boolean |
| 116 | try { |
| 117 | enterprise = await isEnterprise(candidate.organizationId) |
| 118 | } catch (error) { |
| 119 | // A billing-API failure for one org must not abort the whole batch — |
| 120 | // skip this drain and let the next cron tick retry it. |
| 121 | logger.warn('Enterprise check failed; skipping drain', { |
no test coverage detected