(ctx context.Context, nodeName string, tmplCtx *templateresolution.TemplateContext, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts)
| 228 | } |
| 229 | |
| 230 | func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmplCtx *templateresolution.TemplateContext, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) { |
| 231 | |
| 232 | node, err := woc.wf.GetNodeByName(nodeName) |
| 233 | if err != nil { |
| 234 | node = woc.initializeExecutableNode(ctx, nodeName, wfv1.NodeTypeDAG, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning, opts.nodeFlag, true) |
| 235 | } |
| 236 | |
| 237 | defer func() { |
| 238 | node, err := woc.wf.Status.Nodes.Get(node.ID) |
| 239 | if err != nil { |
| 240 | // CRITICAL ERROR IF THIS BRANCH IS REACHED -> PANIC |
| 241 | panic(fmt.Sprintf("expected node for %s due to preceded initializeExecutableNode but couldn't find it", node.ID)) |
| 242 | } |
| 243 | if node.Fulfilled() { |
| 244 | woc.killDaemonedChildren(ctx, node.ID) |
| 245 | } |
| 246 | }() |
| 247 | |
| 248 | dagCtx := &dagContext{ |
| 249 | boundaryName: nodeName, |
| 250 | boundaryID: node.ID, |
| 251 | tasks: tmpl.DAG.Tasks, |
| 252 | visited: make(map[string]bool), |
| 253 | tmpl: tmpl, |
| 254 | wf: woc.wf, |
| 255 | tmplCtx: tmplCtx, |
| 256 | onExitTemplate: opts.onExitTemplate, |
| 257 | dependencies: make(map[string][]string), |
| 258 | dependsLogic: make(map[string]string), |
| 259 | log: woc.log, |
| 260 | } |
| 261 | |
| 262 | // Identify our target tasks. If user did not specify any, then we choose all tasks which have |
| 263 | // no dependants. |
| 264 | var targetTasks []string |
| 265 | if tmpl.DAG.Target == "" { |
| 266 | targetTasks = dagCtx.findLeafTaskNames(ctx, tmpl.DAG.Tasks) |
| 267 | } else { |
| 268 | targetTasks = strings.Split(tmpl.DAG.Target, " ") |
| 269 | } |
| 270 | |
| 271 | // pre-execute daemoned tasks |
| 272 | for _, task := range tmpl.DAG.Tasks { |
| 273 | taskNode := dagCtx.getTaskNode(ctx, task.Name) |
| 274 | if err != nil { |
| 275 | continue |
| 276 | } |
| 277 | if taskNode != nil && taskNode.IsDaemoned() { |
| 278 | woc.executeDAGTask(ctx, dagCtx, task.Name) |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | // kick off execution of each target task asynchronously |
| 283 | onExitCompleted := true |
| 284 | for _, taskName := range targetTasks { |
| 285 | woc.executeDAGTask(ctx, dagCtx, taskName) |
| 286 | |
| 287 | // The exit hook for each target task is started by executeDAGTask -> processTask. |
no test coverage detected