MCPcopy
hub / github.com/FlowiseAI/Flowise / _executeAgentflow

Function _executeAgentflow

packages/server/src/schedule/ScheduleExecutor.ts:147–242  ·  view source on GitHub ↗
(ctx: ScheduleExecutionContext, record: ScheduleRecord, scheduledAt: Date)

Source from the content-addressed store, hash-verified

145// ─── Internal ──────────────────────────────────────────────────────────────────
146
147async function _executeAgentflow(ctx: ScheduleExecutionContext, record: ScheduleRecord, scheduledAt: Date): Promise<any> {
148 const { appDataSource, componentNodes, telemetry, cachePool, usageCacheManager, sseStreamer, identityManager } = ctx
149 const startTime = Date.now()
150
151 const log = await scheduleService.createTriggerLog({
152 appDataSource,
153 scheduleRecordId: record.id,
154 triggerType: record.triggerType,
155 targetId: record.targetId,
156 status: ScheduleTriggerStatus.RUNNING,
157 scheduledAt,
158 workspaceId: record.workspaceId
159 })
160
161 try {
162 const chatflow = await appDataSource.getRepository(ChatFlow).findOneBy({ id: record.targetId })
163 if (!chatflow) throw new Error(`ChatFlow ${record.targetId} not found`)
164 const isAgentFlow = chatflow.type === 'AGENTFLOW'
165 if (!isAgentFlow) throw new Error(`ChatFlow ${record.targetId} is not of type AGENTFLOW`)
166
167 const workspaceId = chatflow.workspaceId ?? record.workspaceId
168
169 const workspace = await appDataSource.getRepository(Workspace).findOneBy({ id: workspaceId })
170 if (!workspace) throw new Error(`Workspace ${workspaceId} not found`)
171 const org = await appDataSource.getRepository(Organization).findOneBy({ id: workspace.organizationId })
172 if (!org) throw new Error(`Organization ${workspace.organizationId} not found`)
173
174 const orgId = org.id
175 const subscriptionId = org.subscriptionId as string
176 const productId = await identityManager.getProductIdFromSubscription(subscriptionId)
177
178 await checkPredictions(org.id, subscriptionId, usageCacheManager)
179
180 const chatId = uuidv4()
181 const incomingInput: IncomingAgentflowInput = { chatId, streaming: false }
182 if (record.scheduleInputMode === 'form') {
183 try {
184 incomingInput.form = record.defaultForm ? JSON.parse(record.defaultForm) : {}
185 } catch (e) {
186 logger.warn(`[ScheduleExecutor]: schedule ${record.id} defaultForm is not valid JSON, falling back to {}`)
187 incomingInput.form = {}
188 }
189 } else if (record.scheduleInputMode === 'none') {
190 // Use a single-space sentinel rather than an empty string, since some models do accept whitespace characters.
191 incomingInput.question = ' '
192 } else {
193 incomingInput.question = record.defaultInput
194 }
195
196 const result = await executeAgentFlow({
197 componentNodes,
198 incomingInput,
199 chatflow,
200 chatId,
201 appDataSource,
202 telemetry,
203 cachePool,
204 usageCacheManager,

Callers 1

executeScheduleJobFunction · 0.85

Calls 5

checkPredictionsFunction · 0.90
executeAgentFlowFunction · 0.90
updatePredictionsUsageFunction · 0.90
parseMethod · 0.65

Tested by

no test coverage detected