MCPcopy
hub / github.com/FlowiseAI/Flowise / processNodeOutputs

Function processNodeOutputs

packages/server/src/utils/buildAgentflow.ts:847–949  Ā·  view source on GitHub ↗

* Process node outputs and handle branching logic

({
    nodeId,
    nodeName,
    result,
    humanInput,
    graph,
    nodes,
    edges,
    nodeExecutionQueue,
    waitingNodes,
    loopCounts,
    sseStreamer,
    chatId
}: IProcessNodeOutputsParams)

Source from the content-addressed store, hash-verified

845 * Process node outputs and handle branching logic
846 */
847async function processNodeOutputs({
848 nodeId,
849 nodeName,
850 result,
851 humanInput,
852 graph,
853 nodes,
854 edges,
855 nodeExecutionQueue,
856 waitingNodes,
857 loopCounts,
858 sseStreamer,
859 chatId
860}: IProcessNodeOutputsParams): Promise<{ humanInput?: IHumanInput }> {
861 logger.debug(`\nšŸ”„ Processing outputs from node: ${nodeId}`)
862
863 let updatedHumanInput = humanInput
864
865 const childNodeIds = graph[nodeId] || []
866 logger.debug(` šŸ‘‰ Child nodes: [${childNodeIds.join(', ')}]`)
867
868 const currentNode = nodes.find((n) => n.id === nodeId)
869 if (!currentNode) return { humanInput: updatedHumanInput }
870
871 // Get nodes to ignore based on conditions
872 const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, edges, nodeId)
873 if (ignoreNodeIds.length) {
874 logger.debug(` ā­ļø Skipping nodes: [${ignoreNodeIds.join(', ')}]`)
875 }
876
877 for (const childId of childNodeIds) {
878 if (ignoreNodeIds.includes(childId)) continue
879
880 const childNode = nodes.find((n) => n.id === childId)
881 if (!childNode) continue
882
883 logger.debug(` šŸ“ Processing child node: ${childId}`)
884
885 let waitingNode = waitingNodes.get(childId)
886
887 if (!waitingNode) {
888 logger.debug(` šŸ†• First time seeing node ${childId} - analyzing dependencies`)
889 waitingNode = setupNodeDependencies(childId, edges, nodes)
890 waitingNodes.set(childId, waitingNode)
891 }
892
893 waitingNode.receivedInputs.set(nodeId, result)
894 logger.debug(` āž• Added input from ${nodeId}`)
895
896 // Check if node is ready to execute
897 if (hasReceivedRequiredInputs(waitingNode)) {
898 logger.debug(` āœ… Node ${childId} ready for execution!`)
899 waitingNodes.delete(childId)
900 nodeExecutionQueue.push({
901 nodeId: childId,
902 data: combineNodeInputs(waitingNode.receivedInputs),
903 inputs: Object.fromEntries(waitingNode.receivedInputs)
904 })

Callers 1

executeAgentFlowFunction Ā· 0.85

Calls 8

determineNodesToIgnoreFunction Ā· 0.85
setupNodeDependenciesFunction Ā· 0.85
hasReceivedRequiredInputsFunction Ā· 0.85
combineNodeInputsFunction Ā· 0.85
setMethod Ā· 0.80
streamTokenEventMethod Ā· 0.65
getMethod Ā· 0.45
deleteMethod Ā· 0.45

Tested by

no test coverage detected