* Process node outputs and handle branching logic
({
nodeId,
nodeName,
result,
humanInput,
graph,
nodes,
edges,
nodeExecutionQueue,
waitingNodes,
loopCounts,
sseStreamer,
chatId
}: IProcessNodeOutputsParams)
| 845 | * Process node outputs and handle branching logic |
| 846 | */ |
| 847 | async function processNodeOutputs({ |
| 848 | nodeId, |
| 849 | nodeName, |
| 850 | result, |
| 851 | humanInput, |
| 852 | graph, |
| 853 | nodes, |
| 854 | edges, |
| 855 | nodeExecutionQueue, |
| 856 | waitingNodes, |
| 857 | loopCounts, |
| 858 | sseStreamer, |
| 859 | chatId |
| 860 | }: IProcessNodeOutputsParams): Promise<{ humanInput?: IHumanInput }> { |
| 861 | logger.debug(`\nš Processing outputs from node: ${nodeId}`) |
| 862 | |
| 863 | let updatedHumanInput = humanInput |
| 864 | |
| 865 | const childNodeIds = graph[nodeId] || [] |
| 866 | logger.debug(` š Child nodes: [${childNodeIds.join(', ')}]`) |
| 867 | |
| 868 | const currentNode = nodes.find((n) => n.id === nodeId) |
| 869 | if (!currentNode) return { humanInput: updatedHumanInput } |
| 870 | |
| 871 | // Get nodes to ignore based on conditions |
| 872 | const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, edges, nodeId) |
| 873 | if (ignoreNodeIds.length) { |
| 874 | logger.debug(` āļø Skipping nodes: [${ignoreNodeIds.join(', ')}]`) |
| 875 | } |
| 876 | |
| 877 | for (const childId of childNodeIds) { |
| 878 | if (ignoreNodeIds.includes(childId)) continue |
| 879 | |
| 880 | const childNode = nodes.find((n) => n.id === childId) |
| 881 | if (!childNode) continue |
| 882 | |
| 883 | logger.debug(` š Processing child node: ${childId}`) |
| 884 | |
| 885 | let waitingNode = waitingNodes.get(childId) |
| 886 | |
| 887 | if (!waitingNode) { |
| 888 | logger.debug(` š First time seeing node ${childId} - analyzing dependencies`) |
| 889 | waitingNode = setupNodeDependencies(childId, edges, nodes) |
| 890 | waitingNodes.set(childId, waitingNode) |
| 891 | } |
| 892 | |
| 893 | waitingNode.receivedInputs.set(nodeId, result) |
| 894 | logger.debug(` ā Added input from ${nodeId}`) |
| 895 | |
| 896 | // Check if node is ready to execute |
| 897 | if (hasReceivedRequiredInputs(waitingNode)) { |
| 898 | logger.debug(` ā Node ${childId} ready for execution!`) |
| 899 | waitingNodes.delete(childId) |
| 900 | nodeExecutionQueue.push({ |
| 901 | nodeId: childId, |
| 902 | data: combineNodeInputs(waitingNode.receivedInputs), |
| 903 | inputs: Object.fromEntries(waitingNode.receivedInputs) |
| 904 | }) |
no test coverage detected