MCPcopy
hub / github.com/FlowiseAI/Flowise / processJob

Method processJob

packages/server/src/queue/PredictionQueue.ts:65–108  ·  view source on GitHub ↗
(data: IExecuteFlowParams | IGenerateAgentflowv2Params)

Source from the content-addressed store, hash-verified

63 }
64
65 async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
66 if (this.redisPublisher) {
67 await this.redisPublisher.connect()
68 }
69 if (this.appDataSource) data.appDataSource = this.appDataSource
70 if (this.telemetry) data.telemetry = this.telemetry
71 if (this.cachePool) data.cachePool = this.cachePool
72 if (this.usageCacheManager) data.usageCacheManager = this.usageCacheManager
73 if (this.componentNodes) data.componentNodes = this.componentNodes
74 if (this.redisPublisher) data.sseStreamer = this.redisPublisher
75
76 if (Object.prototype.hasOwnProperty.call(data, 'isAgentFlowGenerator')) {
77 logger.info(`Generating Agentflow...`)
78 const { prompt, componentNodes, toolNodes, selectedChatModel, question } = data as IGenerateAgentflowv2Params
79 const options: Record<string, any> = {
80 appDataSource: this.appDataSource,
81 databaseEntities: databaseEntities,
82 logger: logger
83 }
84 return await generateAgentflowv2_json({ prompt, componentNodes, toolNodes, selectedChatModel }, question, options)
85 }
86
87 if (Object.prototype.hasOwnProperty.call(data, 'isExecuteCustomFunction')) {
88 const executeCustomFunctionData = data as any
89 logger.info(`[${executeCustomFunctionData.orgId}]: Executing Custom Function...`)
90 return await executeCustomNodeFunction({
91 appDataSource: this.appDataSource,
92 componentNodes: this.componentNodes,
93 data: executeCustomFunctionData.data,
94 workspaceId: executeCustomFunctionData.workspaceId,
95 orgId: executeCustomFunctionData.orgId,
96 canViewVariables: executeCustomFunctionData.canViewVariables
97 })
98 }
99
100 if (this.abortControllerPool) {
101 const abortControllerId = `${data.chatflow.id}_${data.chatId}`
102 const signal = new AbortController()
103 this.abortControllerPool.add(abortControllerId, signal)
104 data.signal = signal
105 }
106
107 return await executeFlow(data)
108 }
109}

Callers

nothing calls this directly

Calls 5

executeFlowFunction · 0.90
connectMethod · 0.45
callMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected