MCPcopy
hub / github.com/argoproj/argo-workflows / operate

Method operate

workflow/controller/operator.go:193–537  ·  view source on GitHub ↗

operate is the main operator logic of a workflow. It evaluates the current state of the workflow, and its pods and decides how to proceed down the execution path. TODO: an error returned by this method should result in requeuing the workflow to be retried at a later time As you must not call `persis

(ctx context.Context)

Source from the content-addressed store, hash-verified

191// later time
192// As you must not call `persistUpdates` twice, you must not call `operate` twice.
193func (woc *wfOperationCtx) operate(ctx context.Context) {
194 defer argoruntime.RecoverFromPanic(ctx, woc.log)
195
196 defer func() {
197 woc.persistUpdates(ctx)
198 }()
199 defer func() {
200 if r := recover(); r != nil {
201 woc.log.WithFields(logging.Fields{"stack": string(debug.Stack()), "r": r}).Error(ctx, "Recovered from panic")
202 if rerr, ok := r.(error); ok {
203 woc.markWorkflowError(ctx, rerr)
204 } else {
205 woc.markWorkflowError(ctx, fmt.Errorf("%v", r))
206 }
207 woc.controller.metrics.OperationPanic(ctx)
208 }
209 }()
210
211 woc.log.WithFields(logging.Fields{"phase": woc.wf.Status.Phase, "resourceVersion": woc.wf.ObjectMeta.ResourceVersion, "lastSeenVersion": woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]}).Info(ctx, "Processing workflow")
212
213 // Set the Execute workflow spec for execution
214 // ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault
215 err := woc.setExecWorkflow(ctx)
216 if err != nil {
217 woc.log.WithError(err).Error(ctx, "Unable to set ExecWorkflow")
218 return
219 }
220
221 if woc.wf.Status.ArtifactRepositoryRef == nil {
222 ref, err := woc.controller.artifactRepositories.Resolve(ctx, woc.execWf.Spec.ArtifactRepositoryRef, woc.wf.Namespace)
223 if err != nil {
224 woc.markWorkflowError(ctx, fmt.Errorf("failed to resolve artifact repository: %w", err))
225 return
226 }
227 woc.wf.Status.ArtifactRepositoryRef = ref
228 woc.updated = true
229 }
230
231 repo, err := woc.controller.artifactRepositories.Get(ctx, woc.wf.Status.ArtifactRepositoryRef)
232 if err != nil {
233 woc.markWorkflowError(ctx, fmt.Errorf("failed to get artifact repository: %v", err))
234 return
235 }
236 woc.artifactRepository = repo
237
238 woc.addArtifactGCFinalizer(ctx)
239
240 // Reconciliation of Outputs (Artifacts). See ReportOutputs() of executor.go.
241 woc.taskResultReconciliation(ctx)
242
243 // Do artifact GC if task result reconciliation is complete.
244 if woc.wf.Status.Fulfilled() {
245 if err := woc.garbageCollectArtifacts(ctx); err != nil {
246 woc.log.WithError(err).Error(ctx, "failed to GC artifacts")
247 return
248 }
249 } else {
250 woc.log.Debug(ctx, "Skipping artifact GC")

Calls 15

persistUpdatesMethod · 0.95
markWorkflowErrorMethod · 0.95
setExecWorkflowMethod · 0.95
markWorkflowFailedMethod · 0.95
markWorkflowPhaseMethod · 0.95
computeMetricsMethod · 0.95
createPDBResourceMethod · 0.95