* Execute from a specific block using cached outputs for upstream blocks.
(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
)
| 126 | * Execute from a specific block using cached outputs for upstream blocks. |
| 127 | */ |
| 128 | async executeFromBlock( |
| 129 | workflowId: string, |
| 130 | startBlockId: string, |
| 131 | sourceSnapshot: SerializableExecutionState |
| 132 | ): Promise<ExecutionResult> { |
| 133 | // Build full DAG with all blocks to compute upstream set for snapshot filtering |
| 134 | // includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger |
| 135 | const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true }) |
| 136 | |
| 137 | const executedBlocks = new Set(sourceSnapshot.executedBlocks) |
| 138 | const validation = validateRunFromBlock(startBlockId, dag, executedBlocks) |
| 139 | if (!validation.valid) { |
| 140 | throw new Error(validation.error) |
| 141 | } |
| 142 | |
| 143 | const { dirtySet, upstreamSet, reachableUpstreamSet } = computeExecutionSets(dag, startBlockId) |
| 144 | const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId |
| 145 | |
| 146 | // Extract container IDs from sentinel IDs in reachable upstream set |
| 147 | // Use reachableUpstreamSet (not upstreamSet) to preserve sibling branch outputs |
| 148 | // Example: A->C, B->C where C references A.result || B.result |
| 149 | // When running from A, B's output should be preserved for C to reference |
| 150 | const reachableContainerIds = new Set<string>() |
| 151 | for (const nodeId of reachableUpstreamSet) { |
| 152 | const loopId = extractLoopIdFromSentinel(nodeId) |
| 153 | if (loopId) reachableContainerIds.add(loopId) |
| 154 | const parallelId = extractParallelIdFromSentinel(nodeId) |
| 155 | if (parallelId) reachableContainerIds.add(parallelId) |
| 156 | } |
| 157 | |
| 158 | // Filter snapshot to include all blocks reachable from dirty blocks |
| 159 | // This preserves sibling branch outputs that dirty blocks may reference |
| 160 | const filteredBlockStates: Record<string, any> = {} |
| 161 | for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) { |
| 162 | const aliasBaseId = stripOuterBranchSuffix(blockId) |
| 163 | const isReachableOuterBranchAlias = |
| 164 | aliasBaseId !== blockId && |
| 165 | Array.from(reachableUpstreamSet).some( |
| 166 | (reachableId) => stripCloneSuffixes(reachableId) === aliasBaseId |
| 167 | ) |
| 168 | if ( |
| 169 | reachableUpstreamSet.has(blockId) || |
| 170 | reachableContainerIds.has(blockId) || |
| 171 | isReachableOuterBranchAlias |
| 172 | ) { |
| 173 | filteredBlockStates[blockId] = state |
| 174 | } |
| 175 | } |
| 176 | const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter((id) => { |
| 177 | const aliasBaseId = stripOuterBranchSuffix(id) |
| 178 | const isReachableOuterBranchAlias = |
| 179 | aliasBaseId !== id && |
| 180 | Array.from(reachableUpstreamSet).some( |
| 181 | (reachableId) => stripCloneSuffixes(reachableId) === aliasBaseId |
| 182 | ) |
| 183 | return ( |
| 184 | reachableUpstreamSet.has(id) || reachableContainerIds.has(id) || isReachableOuterBranchAlias |
| 185 | ) |
no test coverage detected