(now: Date = new Date())
| 37 | * — but the drain `cursor` never advanced, so re-running is safe. |
| 38 | */ |
| 39 | export async function reapOrphanedRuns(now: Date = new Date()): Promise<{ reaped: number }> { |
| 40 | const cutoff = new Date(now.getTime() - ORPHAN_THRESHOLD_MS) |
| 41 | const reaped = await db |
| 42 | .update(dataDrainRuns) |
| 43 | .set({ |
| 44 | status: 'failed', |
| 45 | finishedAt: now, |
| 46 | error: `Orphaned run reaped after exceeding ${ORPHAN_THRESHOLD_MS / 60_000}m without completion`, |
| 47 | }) |
| 48 | .where(and(eq(dataDrainRuns.status, 'running'), lt(dataDrainRuns.startedAt, cutoff))) |
| 49 | .returning({ id: dataDrainRuns.id }) |
| 50 | if (reaped.length > 0) { |
| 51 | logger.warn('Reaped orphaned data drain runs', { count: reaped.length }) |
| 52 | } |
| 53 | return { reaped: reaped.length } |
| 54 | } |
| 55 | |
| 56 | /** |
| 57 | * Selects every enabled drain whose schedule is due (or has never run) and |
no test coverage detected