MCPcopy
hub / github.com/argoproj/argo-workflows / persistUpdates

Method persistUpdates

workflow/controller/operator.go:730–837  ·  view source on GitHub ↗

persistUpdates will update a workflow with any updates made during workflow operation. It also labels any pods as completed if we have extracted everything we need from it. NOTE: a previous implementation used Patch instead of Update, but Patch does not work with the fake CRD clientset which makes u

(ctx context.Context)

Source from the content-addressed store, hash-verified

728// NOTE: a previous implementation used Patch instead of Update, but Patch does not work with
729// the fake CRD clientset which makes unit testing extremely difficult.
730func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
731 if !woc.updated {
732 return
733 }
734
735 diff.LogChanges(ctx, woc.orig, woc.wf)
736
737 resource.UpdateResourceDurations(ctx, woc.wf)
738 progress.UpdateProgress(ctx, woc.wf)
739 // You MUST not call `persistUpdates` twice.
740 // * Fails the `reapplyUpdate` cannot work unless resource versions are different.
741 // * It will double the number of Kubernetes API requests.
742 if woc.orig.ResourceVersion != woc.wf.ResourceVersion {
743 woc.log.WithPanic().Error(ctx, "cannot persist updates with mismatched resource versions")
744 }
745 wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.Namespace)
746
747 nodes := woc.wf.Status.Nodes
748
749 // try and compress nodes if needed
750 err := woc.controller.hydrator.Dehydrate(ctx, woc.wf)
751 if err != nil {
752 woc.log.WithError(err).Warn(ctx, "Failed to dehydrate")
753 woc.markWorkflowError(ctx, err)
754 }
755
756 // Release all acquired lock for completed workflow
757 if woc.wf.Status.Synchronization != nil && woc.wf.Status.Fulfilled() {
758 if woc.controller.syncManager.ReleaseAll(ctx, woc.wf) {
759 woc.log.WithFields(logging.Fields{"key": woc.wf.Name}).Info(ctx, "Released all acquired locks")
760 }
761 }
762
763 // Remove completed taskset status before update workflow.
764 err = woc.removeCompletedTaskSetStatus(ctx, nodes)
765 if err != nil {
766 woc.log.WithError(err).Warn(ctx, "error updating taskset")
767 }
768
769 oldRV := woc.wf.ResourceVersion
770 woc.updateLastSeenVersionAnnotation(oldRV)
771 wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
772 if err != nil {
773 woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow")
774 if argokubeerr.IsRequestEntityTooLargeErr(err) {
775 woc.persistWorkflowSizeLimitErr(ctx, wfClient, err)
776 return
777 }
778 if !apierr.IsConflict(err) {
779 return
780 }
781 woc.log.Info(ctx, "Re-applying updates on latest version and retrying update")
782 wf, err := woc.reapplyUpdate(ctx, wfClient, nodes)
783 if err != nil {
784 woc.wf.Labels[common.LabelKeyReApplyFailed] = "true"
785 woc.log.WithError(err).Info(ctx, "Failed to re-apply update")
786 return
787 }

Callers 3

operateMethod · 0.95
initManagersMethod · 0.80
processNextItemMethod · 0.80

Calls 15

markWorkflowErrorMethod · 0.95
reapplyUpdateMethod · 0.95
updateLastSeenVersionMethod · 0.95
writeBackToInformerMethod · 0.95
deleteTaskResultsMethod · 0.95
requeueAfterMethod · 0.95
queuePodsForCleanupMethod · 0.95
LogChangesFunction · 0.92

Tested by

no test coverage detected