MCPcopy
hub / github.com/argoproj/argo-workflows / reapplyUpdate

Method reapplyUpdate

workflow/controller/operator.go:890–966  ·  view source on GitHub ↗

reapplyUpdate GETs the latest version of the workflow, re-applies the updates and retries the UPDATE multiple times. For reasoning behind this technique, see: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistenc

(ctx context.Context, wfClient v1alpha1.WorkflowInterface, nodes wfv1.Nodes)

Source from the content-addressed store, hash-verified

888// retries the UPDATE multiple times. For reasoning behind this technique, see:
889// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
890func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1.WorkflowInterface, nodes wfv1.Nodes) (*wfv1.Workflow, error) {
891 // if this condition is true, then this func will always error
892 if woc.orig.ResourceVersion != woc.wf.ResourceVersion {
893 woc.log.WithPanic().Error(ctx, "cannot re-apply update with mismatched resource versions")
894 }
895 err := woc.controller.hydrator.Hydrate(ctx, woc.orig)
896 if err != nil {
897 return nil, err
898 }
899 // First generate the patch
900 oldData, err := json.Marshal(woc.orig)
901 if err != nil {
902 return nil, err
903 }
904 woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes)
905 newData, err := json.Marshal(woc.wf)
906 if err != nil {
907 return nil, err
908 }
909 patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
910 if err != nil {
911 return nil, err
912 }
913 // Next get latest version of the workflow, apply the patch and retry the update
914 attempt := 1
915 for {
916 currWf, err := wfClient.Get(ctx, woc.wf.Name, metav1.GetOptions{})
917 if err != nil {
918 return nil, err
919 }
920 // There is something about having informer indexers (introduced in v2.12) that means we are more likely to operate on the
921 // previous version of the workflow. This means under high load, a previously successful workflow could
922 // be operated on again. This can error (e.g. if any pod was deleted as part of clean-up). This check prevents that.
923 // https://github.com/argoproj/argo-workflows/issues/4798
924 if currWf.Status.Fulfilled() {
925 return nil, fmt.Errorf("must never update completed workflows")
926 }
927 err = woc.controller.hydrator.Hydrate(ctx, currWf)
928 if err != nil {
929 return nil, err
930 }
931 for id, node := range woc.wf.Status.Nodes {
932 currNode, err := currWf.Status.Nodes.Get(id)
933 if (err == nil) && currNode.Fulfilled() && node.Phase != currNode.Phase {
934 return nil, fmt.Errorf("must never update completed node %s", id)
935 }
936 }
937 currWfBytes, err := json.Marshal(currWf)
938 if err != nil {
939 return nil, err
940 }
941 newWfBytes, err := jsonpatch.MergePatch(currWfBytes, patchBytes)
942 if err != nil {
943 return nil, err
944 }
945 var newWf wfv1.Workflow
946 err = json.Unmarshal(newWfBytes, &newWf)
947 if err != nil {

Callers 2

persistUpdatesMethod · 0.95

Calls 14

ErrorMethod · 0.65
WithPanicMethod · 0.65
HydrateMethod · 0.65
HydrateWithNodesMethod · 0.65
GetMethod · 0.65
DehydrateMethod · 0.65
UpdateMethod · 0.65
InfoMethod · 0.65
WithFieldMethod · 0.65
WarnMethod · 0.65
WithErrorMethod · 0.65
MarshalMethod · 0.45

Tested by 1