(row: DueRow, now: Date)
| 109 | } |
| 110 | |
| 111 | async function dispatchRow(row: DueRow, now: Date): Promise<RowResult> { |
| 112 | const points = (row.pausePoints ?? {}) as Record<string, PausePoint> |
| 113 | const metadata = (row.metadata ?? {}) as Record<string, unknown> |
| 114 | const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : '' |
| 115 | |
| 116 | const eligiblePoints = Object.values(points).filter( |
| 117 | (point) => |
| 118 | point.pauseKind === 'time' && (!point.resumeStatus || point.resumeStatus === 'paused') |
| 119 | ) |
| 120 | const duePoints = eligiblePoints.filter((point) => { |
| 121 | if (!point.resumeAt) return false |
| 122 | const at = new Date(point.resumeAt) |
| 123 | return !Number.isNaN(at.getTime()) && at <= now |
| 124 | }) |
| 125 | |
| 126 | const failures: DispatchFailure[] = [] |
| 127 | let dispatched = 0 |
| 128 | |
| 129 | for (const point of duePoints) { |
| 130 | if (!point.contextId) continue |
| 131 | try { |
| 132 | const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ |
| 133 | executionId: row.executionId, |
| 134 | workflowId: row.workflowId, |
| 135 | contextId: point.contextId, |
| 136 | resumeInput: {}, |
| 137 | userId, |
| 138 | allowedPauseKinds: ['time'], |
| 139 | }) |
| 140 | |
| 141 | if (enqueueResult.status === 'starting') { |
| 142 | // Route through `executeResumeJob` (not `PauseResumeManager.startResumeExecution` |
| 143 | // directly) so cell-context restoration + cascade-loop continuation |
| 144 | // fires. This is the same primitive the trigger.dev `resumeExecutionTask` |
| 145 | // wraps — calling it directly handles both trigger.dev-disabled local |
| 146 | // dev and trigger.dev-enabled prod identically. |
| 147 | const { executeResumeJob } = await import('@/background/resume-execution') |
| 148 | void executeResumeJob({ |
| 149 | resumeEntryId: enqueueResult.resumeEntryId, |
| 150 | resumeExecutionId: enqueueResult.resumeExecutionId, |
| 151 | pausedExecutionId: enqueueResult.pausedExecution.id, |
| 152 | contextId: enqueueResult.contextId, |
| 153 | resumeInput: enqueueResult.resumeInput, |
| 154 | userId: enqueueResult.userId, |
| 155 | workflowId: row.workflowId, |
| 156 | parentExecutionId: row.executionId, |
| 157 | }).catch((error) => { |
| 158 | logger.error('Background time-pause resume failed', { |
| 159 | executionId: row.executionId, |
| 160 | contextId: point.contextId, |
| 161 | error: toError(error).message, |
| 162 | }) |
| 163 | }) |
| 164 | } |
| 165 | dispatched++ |
| 166 | } catch (error) { |
| 167 | const message = toError(error).message |
| 168 | logger.warn('Failed to dispatch time-pause resume', { |
no test coverage detected