MCPcopy
hub / github.com/argoproj/argo-workflows / executeDAG

Method executeDAG

workflow/controller/dag.go:230–392  ·  view source on GitHub ↗
(ctx context.Context, nodeName string, tmplCtx *templateresolution.TemplateContext, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts)

Source from the content-addressed store, hash-verified

228}
229
230func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmplCtx *templateresolution.TemplateContext, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
231
232 node, err := woc.wf.GetNodeByName(nodeName)
233 if err != nil {
234 node = woc.initializeExecutableNode(ctx, nodeName, wfv1.NodeTypeDAG, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning, opts.nodeFlag, true)
235 }
236
237 defer func() {
238 node, err := woc.wf.Status.Nodes.Get(node.ID)
239 if err != nil {
240 // CRITICAL ERROR IF THIS BRANCH IS REACHED -> PANIC
241 panic(fmt.Sprintf("expected node for %s due to preceded initializeExecutableNode but couldn't find it", node.ID))
242 }
243 if node.Fulfilled() {
244 woc.killDaemonedChildren(ctx, node.ID)
245 }
246 }()
247
248 dagCtx := &dagContext{
249 boundaryName: nodeName,
250 boundaryID: node.ID,
251 tasks: tmpl.DAG.Tasks,
252 visited: make(map[string]bool),
253 tmpl: tmpl,
254 wf: woc.wf,
255 tmplCtx: tmplCtx,
256 onExitTemplate: opts.onExitTemplate,
257 dependencies: make(map[string][]string),
258 dependsLogic: make(map[string]string),
259 log: woc.log,
260 }
261
262 // Identify our target tasks. If user did not specify any, then we choose all tasks which have
263 // no dependants.
264 var targetTasks []string
265 if tmpl.DAG.Target == "" {
266 targetTasks = dagCtx.findLeafTaskNames(ctx, tmpl.DAG.Tasks)
267 } else {
268 targetTasks = strings.Split(tmpl.DAG.Target, " ")
269 }
270
271 // pre-execute daemoned tasks
272 for _, task := range tmpl.DAG.Tasks {
273 taskNode := dagCtx.getTaskNode(ctx, task.Name)
274 if err != nil {
275 continue
276 }
277 if taskNode != nil && taskNode.IsDaemoned() {
278 woc.executeDAGTask(ctx, dagCtx, task.Name)
279 }
280 }
281
282 // kick off execution of each target task asynchronously
283 onExitCompleted := true
284 for _, taskName := range targetTasks {
285 woc.executeDAGTask(ctx, dagCtx, taskName)
286
287 // The exit hook for each target task is started by executeDAGTask -> processTask.

Callers 1

executeTemplateMethod · 0.95

Calls 15

killDaemonedChildrenMethod · 0.95
findLeafTaskNamesMethod · 0.95
getTaskNodeMethod · 0.95
executeDAGTaskMethod · 0.95
GetTaskMethod · 0.95
markNodeErrorMethod · 0.95
assessDAGPhaseMethod · 0.95
GetShutdownStrategyMethod · 0.95

Tested by

no test coverage detected