checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism
(ctx context.Context, tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string)
| 3112 | |
| 3113 | // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism |
| 3114 | func (woc *wfOperationCtx) checkParallelism(ctx context.Context, tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { |
| 3115 | if woc.execWf.Spec.Parallelism != nil && woc.activePods >= *woc.execWf.Spec.Parallelism { |
| 3116 | woc.log.WithFields(logging.Fields{"activePods": woc.activePods, "parallelism": *woc.execWf.Spec.Parallelism}).Info(ctx, "workflow active pod spec parallelism reached") |
| 3117 | return ErrParallelismReached |
| 3118 | } |
| 3119 | |
| 3120 | // If we are a DAG or Steps template, check if we have active pods or unsuccessful children |
| 3121 | if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) { |
| 3122 | // Check failFast |
| 3123 | if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 { |
| 3124 | if woc.getActivePods(node.ID) == 0 { |
| 3125 | if tmpl.GetType() == wfv1.TemplateTypeSteps { |
| 3126 | if leafStepGroupNode := woc.findLeafNodeWithType(ctx, node.ID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil { |
| 3127 | woc.markNodePhase(ctx, leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") |
| 3128 | } |
| 3129 | } |
| 3130 | woc.markNodePhase(ctx, node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") |
| 3131 | } |
| 3132 | return ErrParallelismReached |
| 3133 | } |
| 3134 | |
| 3135 | // Check parallelism |
| 3136 | if tmpl.HasParallelism() && woc.getActivePods(node.ID) >= *tmpl.Parallelism { |
| 3137 | woc.log.WithFields(logging.Fields{"node": node.ID, "parallelism": *tmpl.Parallelism}).Info(ctx, "template active children parallelism exceeded") |
| 3138 | return ErrParallelismReached |
| 3139 | } |
| 3140 | } |
| 3141 | |
| 3142 | // if we are about to execute a pod, make sure our parent hasn't reached its limit |
| 3143 | if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { |
| 3144 | boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID) |
| 3145 | if err != nil { |
| 3146 | return err |
| 3147 | } |
| 3148 | |
| 3149 | boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(ctx, boundaryID) |
| 3150 | if err != nil { |
| 3151 | return err |
| 3152 | } |
| 3153 | // A new template was stored during resolution, persist it |
| 3154 | if templateStored { |
| 3155 | woc.updated = true |
| 3156 | } |
| 3157 | |
| 3158 | // Check failFast |
| 3159 | if boundaryTemplate != nil && boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 { |
| 3160 | if woc.getActivePods(boundaryID) == 0 { |
| 3161 | if boundaryTemplate.GetType() == wfv1.TemplateTypeSteps { |
| 3162 | if leafStepGroupNode := woc.findLeafNodeWithType(ctx, boundaryID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil { |
| 3163 | woc.markNodePhase(ctx, leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") |
| 3164 | } |
| 3165 | } |
| 3166 | woc.markNodePhase(ctx, boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled") |
| 3167 | } |
| 3168 | return ErrParallelismReached |
| 3169 | } |
| 3170 | |
| 3171 | // Check parallelism |
no test coverage detected