executeDAGTask traverses and executes the upward chain of dependencies of a task
(ctx context.Context, dagCtx *dagContext, taskName string)
| 416 | |
| 417 | // executeDAGTask traverses and executes the upward chain of dependencies of a task |
| 418 | func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContext, taskName string) { |
| 419 | if _, ok := dagCtx.visited[taskName]; ok { |
| 420 | return |
| 421 | } |
| 422 | dagCtx.visited[taskName] = true |
| 423 | |
| 424 | node := dagCtx.getTaskNode(ctx, taskName) |
| 425 | task := dagCtx.GetTask(ctx, taskName) |
| 426 | ctx, log := woc.log.WithField("taskName", taskName).InContext(ctx) |
| 427 | if node != nil && (node.Fulfilled() || node.Phase == wfv1.NodeRunning) { |
| 428 | scope, err := woc.buildLocalScopeFromTask(ctx, dagCtx, task) |
| 429 | if err != nil { |
| 430 | log.WithError(err).Error(ctx, "Failed to build local scope from task") |
| 431 | woc.markNodeError(ctx, node.Name, err) |
| 432 | return |
| 433 | } |
| 434 | scope.addParamToScope(fmt.Sprintf("tasks.%s.status", task.Name), string(node.Phase)) |
| 435 | hookCompleted, err := woc.executeTmplLifeCycleHook(ctx, scope, dagCtx.GetTask(ctx, taskName).Hooks, node, dagCtx.boundaryID, dagCtx.tmplCtx, "tasks."+taskName) |
| 436 | if err != nil { |
| 437 | woc.markNodeError(ctx, node.Name, err) |
| 438 | } |
| 439 | // Check all hooks are completes |
| 440 | if !hookCompleted { |
| 441 | return |
| 442 | } |
| 443 | } |
| 444 | |
| 445 | if node != nil && node.Phase.Fulfilled(node.TaskResultSynced) { |
| 446 | // Collect the completed task metrics |
| 447 | _, tmpl, _, tmplErr := dagCtx.tmplCtx.ResolveTemplate(ctx, task) |
| 448 | if tmplErr != nil { |
| 449 | woc.markNodeError(ctx, node.Name, tmplErr) |
| 450 | return |
| 451 | } |
| 452 | if err := woc.mergedTemplateDefaultsInto(tmpl); err != nil { |
| 453 | woc.markNodeError(ctx, node.Name, err) |
| 454 | return |
| 455 | } |
| 456 | if tmpl != nil && tmpl.Metrics != nil { |
| 457 | if prevNodeStatus, ok := woc.preExecutionNodeStatuses[node.ID]; ok && !prevNodeStatus.Fulfilled() { |
| 458 | localScope, realTimeScope := woc.prepareMetricScope(node) |
| 459 | woc.computeMetrics(ctx, tmpl.Metrics.Prometheus, localScope, realTimeScope, false) |
| 460 | } |
| 461 | } |
| 462 | |
| 463 | processedTmpl, err := common.ProcessArgs(ctx, tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) |
| 464 | if err != nil { |
| 465 | woc.markNodeError(ctx, node.Name, err) |
| 466 | } |
| 467 | |
| 468 | // Release acquired lock completed task. |
| 469 | if processedTmpl != nil { |
| 470 | woc.controller.syncManager.Release(ctx, woc.wf, node.ID, processedTmpl.Synchronization) |
| 471 | } |
| 472 | |
| 473 | scope, err := woc.buildLocalScopeFromTask(ctx, dagCtx, task) |
| 474 | if err != nil { |
| 475 | woc.markNodeError(ctx, node.Name, err) |
no test coverage detected