reapplyUpdate GETs the latest version of the workflow, re-applies the updates and retries the UPDATE multiple times. For reasoning behind this technique, see: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistenc
(ctx context.Context, wfClient v1alpha1.WorkflowInterface, nodes wfv1.Nodes)
| 888 | // retries the UPDATE multiple times. For reasoning behind this technique, see: |
| 889 | // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency |
| 890 | func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1.WorkflowInterface, nodes wfv1.Nodes) (*wfv1.Workflow, error) { |
| 891 | // if this condition is true, then this func will always error |
| 892 | if woc.orig.ResourceVersion != woc.wf.ResourceVersion { |
| 893 | woc.log.WithPanic().Error(ctx, "cannot re-apply update with mismatched resource versions") |
| 894 | } |
| 895 | err := woc.controller.hydrator.Hydrate(ctx, woc.orig) |
| 896 | if err != nil { |
| 897 | return nil, err |
| 898 | } |
| 899 | // First generate the patch |
| 900 | oldData, err := json.Marshal(woc.orig) |
| 901 | if err != nil { |
| 902 | return nil, err |
| 903 | } |
| 904 | woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) |
| 905 | newData, err := json.Marshal(woc.wf) |
| 906 | if err != nil { |
| 907 | return nil, err |
| 908 | } |
| 909 | patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) |
| 910 | if err != nil { |
| 911 | return nil, err |
| 912 | } |
| 913 | // Next get latest version of the workflow, apply the patch and retry the update |
| 914 | attempt := 1 |
| 915 | for { |
| 916 | currWf, err := wfClient.Get(ctx, woc.wf.Name, metav1.GetOptions{}) |
| 917 | if err != nil { |
| 918 | return nil, err |
| 919 | } |
| 920 | // There is something about having informer indexers (introduced in v2.12) that means we are more likely to operate on the |
| 921 | // previous version of the workflow. This means under high load, a previously successful workflow could |
| 922 | // be operated on again. This can error (e.g. if any pod was deleted as part of clean-up). This check prevents that. |
| 923 | // https://github.com/argoproj/argo-workflows/issues/4798 |
| 924 | if currWf.Status.Fulfilled() { |
| 925 | return nil, fmt.Errorf("must never update completed workflows") |
| 926 | } |
| 927 | err = woc.controller.hydrator.Hydrate(ctx, currWf) |
| 928 | if err != nil { |
| 929 | return nil, err |
| 930 | } |
| 931 | for id, node := range woc.wf.Status.Nodes { |
| 932 | currNode, err := currWf.Status.Nodes.Get(id) |
| 933 | if (err == nil) && currNode.Fulfilled() && node.Phase != currNode.Phase { |
| 934 | return nil, fmt.Errorf("must never update completed node %s", id) |
| 935 | } |
| 936 | } |
| 937 | currWfBytes, err := json.Marshal(currWf) |
| 938 | if err != nil { |
| 939 | return nil, err |
| 940 | } |
| 941 | newWfBytes, err := jsonpatch.MergePatch(currWfBytes, patchBytes) |
| 942 | if err != nil { |
| 943 | return nil, err |
| 944 | } |
| 945 | var newWf wfv1.Workflow |
| 946 | err = json.Unmarshal(newWfBytes, &newWf) |
| 947 | if err != nil { |