(ctx: ScheduleExecutionContext, record: ScheduleRecord, scheduledAt: Date)
| 145 | // ─── Internal ────────────────────────────────────────────────────────────────── |
| 146 | |
| 147 | async function _executeAgentflow(ctx: ScheduleExecutionContext, record: ScheduleRecord, scheduledAt: Date): Promise<any> { |
| 148 | const { appDataSource, componentNodes, telemetry, cachePool, usageCacheManager, sseStreamer, identityManager } = ctx |
| 149 | const startTime = Date.now() |
| 150 | |
| 151 | const log = await scheduleService.createTriggerLog({ |
| 152 | appDataSource, |
| 153 | scheduleRecordId: record.id, |
| 154 | triggerType: record.triggerType, |
| 155 | targetId: record.targetId, |
| 156 | status: ScheduleTriggerStatus.RUNNING, |
| 157 | scheduledAt, |
| 158 | workspaceId: record.workspaceId |
| 159 | }) |
| 160 | |
| 161 | try { |
| 162 | const chatflow = await appDataSource.getRepository(ChatFlow).findOneBy({ id: record.targetId }) |
| 163 | if (!chatflow) throw new Error(`ChatFlow ${record.targetId} not found`) |
| 164 | const isAgentFlow = chatflow.type === 'AGENTFLOW' |
| 165 | if (!isAgentFlow) throw new Error(`ChatFlow ${record.targetId} is not of type AGENTFLOW`) |
| 166 | |
| 167 | const workspaceId = chatflow.workspaceId ?? record.workspaceId |
| 168 | |
| 169 | const workspace = await appDataSource.getRepository(Workspace).findOneBy({ id: workspaceId }) |
| 170 | if (!workspace) throw new Error(`Workspace ${workspaceId} not found`) |
| 171 | const org = await appDataSource.getRepository(Organization).findOneBy({ id: workspace.organizationId }) |
| 172 | if (!org) throw new Error(`Organization ${workspace.organizationId} not found`) |
| 173 | |
| 174 | const orgId = org.id |
| 175 | const subscriptionId = org.subscriptionId as string |
| 176 | const productId = await identityManager.getProductIdFromSubscription(subscriptionId) |
| 177 | |
| 178 | await checkPredictions(org.id, subscriptionId, usageCacheManager) |
| 179 | |
| 180 | const chatId = uuidv4() |
| 181 | const incomingInput: IncomingAgentflowInput = { chatId, streaming: false } |
| 182 | if (record.scheduleInputMode === 'form') { |
| 183 | try { |
| 184 | incomingInput.form = record.defaultForm ? JSON.parse(record.defaultForm) : {} |
| 185 | } catch (e) { |
| 186 | logger.warn(`[ScheduleExecutor]: schedule ${record.id} defaultForm is not valid JSON, falling back to {}`) |
| 187 | incomingInput.form = {} |
| 188 | } |
| 189 | } else if (record.scheduleInputMode === 'none') { |
| 190 | // Use a single-space sentinel rather than an empty string, since some models do accept whitespace characters. |
| 191 | incomingInput.question = ' ' |
| 192 | } else { |
| 193 | incomingInput.question = record.defaultInput |
| 194 | } |
| 195 | |
| 196 | const result = await executeAgentFlow({ |
| 197 | componentNodes, |
| 198 | incomingInput, |
| 199 | chatflow, |
| 200 | chatId, |
| 201 | appDataSource, |
| 202 | telemetry, |
| 203 | cachePool, |
| 204 | usageCacheManager, |
no test coverage detected