MCPcopy
hub / github.com/argoproj/argo-workflows / checkParallelism

Method checkParallelism

workflow/controller/operator.go:3114–3178  ·  view source on GitHub ↗

checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism

(ctx context.Context, tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string)

Source from the content-addressed store, hash-verified

3112
3113// checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism
3114func (woc *wfOperationCtx) checkParallelism(ctx context.Context, tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error {
3115 if woc.execWf.Spec.Parallelism != nil && woc.activePods >= *woc.execWf.Spec.Parallelism {
3116 woc.log.WithFields(logging.Fields{"activePods": woc.activePods, "parallelism": *woc.execWf.Spec.Parallelism}).Info(ctx, "workflow active pod spec parallelism reached")
3117 return ErrParallelismReached
3118 }
3119
3120 // If we are a DAG or Steps template, check if we have active pods or unsuccessful children
3121 if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) {
3122 // Check failFast
3123 if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 {
3124 if woc.getActivePods(node.ID) == 0 {
3125 if tmpl.GetType() == wfv1.TemplateTypeSteps {
3126 if leafStepGroupNode := woc.findLeafNodeWithType(ctx, node.ID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
3127 woc.markNodePhase(ctx, leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
3128 }
3129 }
3130 woc.markNodePhase(ctx, node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
3131 }
3132 return ErrParallelismReached
3133 }
3134
3135 // Check parallelism
3136 if tmpl.HasParallelism() && woc.getActivePods(node.ID) >= *tmpl.Parallelism {
3137 woc.log.WithFields(logging.Fields{"node": node.ID, "parallelism": *tmpl.Parallelism}).Info(ctx, "template active children parallelism exceeded")
3138 return ErrParallelismReached
3139 }
3140 }
3141
3142 // if we are about to execute a pod, make sure our parent hasn't reached its limit
3143 if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
3144 boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
3145 if err != nil {
3146 return err
3147 }
3148
3149 boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(ctx, boundaryID)
3150 if err != nil {
3151 return err
3152 }
3153 // A new template was stored during resolution, persist it
3154 if templateStored {
3155 woc.updated = true
3156 }
3157
3158 // Check failFast
3159 if boundaryTemplate != nil && boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 {
3160 if woc.getActivePods(boundaryID) == 0 {
3161 if boundaryTemplate.GetType() == wfv1.TemplateTypeSteps {
3162 if leafStepGroupNode := woc.findLeafNodeWithType(ctx, boundaryID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
3163 woc.markNodePhase(ctx, leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
3164 }
3165 }
3166 woc.markNodePhase(ctx, boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
3167 }
3168 return ErrParallelismReached
3169 }
3170
3171 // Check parallelism

Callers 1

executeTemplateMethod · 0.95

Calls 12

getActivePodsMethod · 0.95
findLeafNodeWithTypeMethod · 0.95
markNodePhaseMethod · 0.95
getActiveChildrenMethod · 0.95
IsFailFastMethod · 0.80
HasParallelismMethod · 0.80
InfoMethod · 0.65
WithFieldsMethod · 0.65
GetMethod · 0.65
GetTypeMethod · 0.45

Tested by

no test coverage detected