operate is the main operator logic of a workflow. It evaluates the current state of the workflow, and its pods and decides how to proceed down the execution path. TODO: an error returned by this method should result in requeuing the workflow to be retried at a later time As you must not call `persis
(ctx context.Context)
| 191 | // later time |
| 192 | // As you must not call `persistUpdates` twice, you must not call `operate` twice. |
| 193 | func (woc *wfOperationCtx) operate(ctx context.Context) { |
| 194 | defer argoruntime.RecoverFromPanic(ctx, woc.log) |
| 195 | |
| 196 | defer func() { |
| 197 | woc.persistUpdates(ctx) |
| 198 | }() |
| 199 | defer func() { |
| 200 | if r := recover(); r != nil { |
| 201 | woc.log.WithFields(logging.Fields{"stack": string(debug.Stack()), "r": r}).Error(ctx, "Recovered from panic") |
| 202 | if rerr, ok := r.(error); ok { |
| 203 | woc.markWorkflowError(ctx, rerr) |
| 204 | } else { |
| 205 | woc.markWorkflowError(ctx, fmt.Errorf("%v", r)) |
| 206 | } |
| 207 | woc.controller.metrics.OperationPanic(ctx) |
| 208 | } |
| 209 | }() |
| 210 | |
| 211 | woc.log.WithFields(logging.Fields{"phase": woc.wf.Status.Phase, "resourceVersion": woc.wf.ObjectMeta.ResourceVersion, "lastSeenVersion": woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]}).Info(ctx, "Processing workflow") |
| 212 | |
| 213 | // Set the Execute workflow spec for execution |
| 214 | // ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault |
| 215 | err := woc.setExecWorkflow(ctx) |
| 216 | if err != nil { |
| 217 | woc.log.WithError(err).Error(ctx, "Unable to set ExecWorkflow") |
| 218 | return |
| 219 | } |
| 220 | |
| 221 | if woc.wf.Status.ArtifactRepositoryRef == nil { |
| 222 | ref, err := woc.controller.artifactRepositories.Resolve(ctx, woc.execWf.Spec.ArtifactRepositoryRef, woc.wf.Namespace) |
| 223 | if err != nil { |
| 224 | woc.markWorkflowError(ctx, fmt.Errorf("failed to resolve artifact repository: %w", err)) |
| 225 | return |
| 226 | } |
| 227 | woc.wf.Status.ArtifactRepositoryRef = ref |
| 228 | woc.updated = true |
| 229 | } |
| 230 | |
| 231 | repo, err := woc.controller.artifactRepositories.Get(ctx, woc.wf.Status.ArtifactRepositoryRef) |
| 232 | if err != nil { |
| 233 | woc.markWorkflowError(ctx, fmt.Errorf("failed to get artifact repository: %v", err)) |
| 234 | return |
| 235 | } |
| 236 | woc.artifactRepository = repo |
| 237 | |
| 238 | woc.addArtifactGCFinalizer(ctx) |
| 239 | |
| 240 | // Reconciliation of Outputs (Artifacts). See ReportOutputs() of executor.go. |
| 241 | woc.taskResultReconciliation(ctx) |
| 242 | |
| 243 | // Do artifact GC if task result reconciliation is complete. |
| 244 | if woc.wf.Status.Fulfilled() { |
| 245 | if err := woc.garbageCollectArtifacts(ctx); err != nil { |
| 246 | woc.log.WithError(err).Error(ctx, "failed to GC artifacts") |
| 247 | return |
| 248 | } |
| 249 | } else { |
| 250 | woc.log.Debug(ctx, "Skipping artifact GC") |