MCPcopy
hub / github.com/argoproj/argo-workflows / executeDAGTask

Method executeDAGTask

workflow/controller/dag.go:418–666  ·  view source on GitHub ↗

executeDAGTask traverses and executes the upward chain of dependencies of a task

(ctx context.Context, dagCtx *dagContext, taskName string)

Source from the content-addressed store, hash-verified

416
417// executeDAGTask traverses and executes the upward chain of dependencies of a task
418func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContext, taskName string) {
419 if _, ok := dagCtx.visited[taskName]; ok {
420 return
421 }
422 dagCtx.visited[taskName] = true
423
424 node := dagCtx.getTaskNode(ctx, taskName)
425 task := dagCtx.GetTask(ctx, taskName)
426 ctx, log := woc.log.WithField("taskName", taskName).InContext(ctx)
427 if node != nil && (node.Fulfilled() || node.Phase == wfv1.NodeRunning) {
428 scope, err := woc.buildLocalScopeFromTask(ctx, dagCtx, task)
429 if err != nil {
430 log.WithError(err).Error(ctx, "Failed to build local scope from task")
431 woc.markNodeError(ctx, node.Name, err)
432 return
433 }
434 scope.addParamToScope(fmt.Sprintf("tasks.%s.status", task.Name), string(node.Phase))
435 hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, scope, dagCtx.GetTask(ctx, taskName).Hooks, node, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName)
436 if err != nil {
437 woc.markNodeError(ctx, node.Name, err)
438 }
439 // Check all hooks are completes
440 if !hookCompleted {
441 return
442 }
443 }
444
445 if node != nil && node.Phase.Fulfilled(node.TaskResultSynced) {
446 // Collect the completed task metrics
447 _, tmpl, _, tmplErr := dagCtx.tmplCtx.ResolveTemplate(ctx, task)
448 if tmplErr != nil {
449 woc.markNodeError(ctx, node.Name, tmplErr)
450 return
451 }
452 if err := woc.mergedTemplateDefaultsInto(tmpl); err != nil {
453 woc.markNodeError(ctx, node.Name, err)
454 return
455 }
456 if tmpl != nil && tmpl.Metrics != nil {
457 if prevNodeStatus, ok := woc.preExecutionNodeStatuses[node.ID]; ok && !prevNodeStatus.Fulfilled() {
458 localScope, realTimeScope := woc.prepareMetricScope(node)
459 woc.computeMetrics(ctx, tmpl.Metrics.Prometheus, localScope, realTimeScope, false)
460 }
461 }
462
463 processedTmpl, err := common.ProcessArgs(ctx, tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
464 if err != nil {
465 woc.markNodeError(ctx, node.Name, err)
466 }
467
468 // Release acquired lock completed task.
469 if processedTmpl != nil {
470 woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization)
471 }
472
473 scope, err := woc.buildLocalScopeFromTask(ctx, dagCtx, task)
474 if err != nil {
475 woc.markNodeError(ctx, node.Name, err)

Callers 1

executeDAGMethod · 0.95

Calls 15

markNodeErrorMethod · 0.95
prepareMetricScopeMethod · 0.95
computeMetricsMethod · 0.95
runOnExitNodeMethod · 0.95
addChildNodeMethod · 0.95
getOutboundNodesMethod · 0.95
initializeNodeMethod · 0.95
executeTemplateMethod · 0.95

Tested by

no test coverage detected