(
ctx: ScheduleExecutionContext,
scheduleRecordId: string,
callbacks?: ScheduleExecutionCallbacks
)
| 68 | * @returns The agentflow execution result, or `undefined` if skipped. |
| 69 | */ |
| 70 | export async function executeScheduleJob( |
| 71 | ctx: ScheduleExecutionContext, |
| 72 | scheduleRecordId: string, |
| 73 | callbacks?: ScheduleExecutionCallbacks |
| 74 | ): Promise<any> { |
| 75 | const scheduledAt = new Date() |
| 76 | const { appDataSource } = ctx |
| 77 | |
| 78 | // ── 1. Load & validate record ────────────────────────────────────────── |
| 79 | const scheduleRecord = await appDataSource.getRepository(ScheduleRecord).findOneBy({ id: scheduleRecordId }) |
| 80 | |
| 81 | // If the record is missing entirely, log and skip without creating a trigger log. |
| 82 | if (!scheduleRecord) { |
| 83 | logger.warn(`[ScheduleExecutor]: Schedule ${scheduleRecordId} not found, skipping`) |
| 84 | await callbacks?.onRecordNotFoundOrDisabled?.(scheduleRecordId) |
| 85 | return undefined |
| 86 | } |
| 87 | // If the record exists but is disabled, record a SKIPPED trigger log with proper attribution. |
| 88 | if (!scheduleRecord.enabled) { |
| 89 | logger.warn(`[ScheduleExecutor]: Schedule ${scheduleRecordId} disabled, skipping`) |
| 90 | await callbacks?.onRecordNotFoundOrDisabled?.(scheduleRecordId) |
| 91 | await scheduleService.createTriggerLog({ |
| 92 | appDataSource, |
| 93 | scheduleRecordId, |
| 94 | triggerType: scheduleRecord.triggerType ?? ScheduleTriggerType.AGENTFLOW, |
| 95 | targetId: scheduleRecord.targetId, |
| 96 | status: ScheduleTriggerStatus.SKIPPED, |
| 97 | scheduledAt, |
| 98 | workspaceId: scheduleRecord.workspaceId |
| 99 | }) |
| 100 | return undefined |
| 101 | } |
| 102 | |
| 103 | // ── 2. End-date / input validation ───────────────────────────────────── |
| 104 | const isInputValid = |
| 105 | scheduleRecord.scheduleInputMode === 'text' |
| 106 | ? scheduleService.isScheduleInputValid(scheduleRecord.scheduleInputMode, scheduleRecord.defaultInput) |
| 107 | : true |
| 108 | if ((scheduleRecord.endDate && scheduledAt >= scheduleRecord.endDate) || !isInputValid) { |
| 109 | logger.debug(`[ScheduleExecutor]: Schedule ${scheduleRecordId} has passed end date or invalid input, disabling`) |
| 110 | await callbacks?.onRecordExpiredOrInvalid?.(scheduleRecord) |
| 111 | await scheduleService.createTriggerLog({ |
| 112 | appDataSource, |
| 113 | scheduleRecordId, |
| 114 | triggerType: scheduleRecord.triggerType ?? ScheduleTriggerType.AGENTFLOW, |
| 115 | targetId: scheduleRecord.targetId, |
| 116 | status: ScheduleTriggerStatus.SKIPPED, |
| 117 | scheduledAt, |
| 118 | workspaceId: scheduleRecord.workspaceId |
| 119 | }) |
| 120 | return undefined |
| 121 | } |
| 122 | |
| 123 | // ── 3. nextRunAt guard ───────────────────────────────────────────────── |
| 124 | if (scheduleRecord.nextRunAt && scheduleRecord.nextRunAt > scheduledAt) { |
| 125 | logger.debug( |
| 126 | `[ScheduleExecutor]: Scheduled time ${scheduledAt.toISOString()} is before nextRunAt ` + |
| 127 | `${scheduleRecord.nextRunAt.toISOString()} for schedule ${scheduleRecordId}, skipping` |
no test coverage detected