| 74 | * @returns Object containing dirtySet, upstreamSet, and reachableUpstreamSet |
| 75 | */ |
| 76 | export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets { |
| 77 | const dirty = new Set<string>([startBlockId]) |
| 78 | const upstream = new Set<string>() |
| 79 | const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag) |
| 80 | const traversalStartId = sentinelStartId ?? startBlockId |
| 81 | |
| 82 | if (sentinelStartId) { |
| 83 | dirty.add(sentinelStartId) |
| 84 | } |
| 85 | |
| 86 | // BFS downstream for dirty set |
| 87 | const downstreamQueue = [traversalStartId] |
| 88 | while (downstreamQueue.length > 0) { |
| 89 | const nodeId = downstreamQueue.shift()! |
| 90 | const node = dag.nodes.get(nodeId) |
| 91 | if (!node) continue |
| 92 | |
| 93 | for (const [, edge] of node.outgoingEdges) { |
| 94 | if (!dirty.has(edge.target)) { |
| 95 | dirty.add(edge.target) |
| 96 | downstreamQueue.push(edge.target) |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | // BFS upstream from start block for upstream set |
| 102 | const upstreamQueue = [traversalStartId] |
| 103 | while (upstreamQueue.length > 0) { |
| 104 | const nodeId = upstreamQueue.shift()! |
| 105 | const node = dag.nodes.get(nodeId) |
| 106 | if (!node) continue |
| 107 | |
| 108 | for (const sourceId of node.incomingEdges) { |
| 109 | if (!upstream.has(sourceId)) { |
| 110 | upstream.add(sourceId) |
| 111 | upstreamQueue.push(sourceId) |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | // Compute reachable upstream: all non-dirty blocks upstream of ANY dirty block |
| 117 | // This handles the case where a dirty block (like C in A->C, B->C) may reference |
| 118 | // sibling branches (like B when running from A) |
| 119 | const reachableUpstream = new Set<string>() |
| 120 | for (const dirtyNodeId of dirty) { |
| 121 | const node = dag.nodes.get(dirtyNodeId) |
| 122 | if (!node) continue |
| 123 | |
| 124 | // BFS upstream from this dirty node |
| 125 | const queue = [...node.incomingEdges] |
| 126 | while (queue.length > 0) { |
| 127 | const sourceId = queue.shift()! |
| 128 | if (reachableUpstream.has(sourceId) || dirty.has(sourceId)) continue |
| 129 | |
| 130 | reachableUpstream.add(sourceId) |
| 131 | const sourceNode = dag.nodes.get(sourceId) |
| 132 | if (sourceNode) { |
| 133 | queue.push(...sourceNode.incomingEdges) |