persistUpdates will update a workflow with any updates made during workflow operation. It also labels any pods as completed if we have extracted everything we need from it. NOTE: a previous implementation used Patch instead of Update, but Patch does not work with the fake CRD clientset which makes u
(ctx context.Context)
| 728 | // NOTE: a previous implementation used Patch instead of Update, but Patch does not work with |
| 729 | // the fake CRD clientset which makes unit testing extremely difficult. |
| 730 | func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { |
| 731 | if !woc.updated { |
| 732 | return |
| 733 | } |
| 734 | |
| 735 | diff.LogChanges(ctx, woc.orig, woc.wf) |
| 736 | |
| 737 | resource.UpdateResourceDurations(ctx, woc.wf) |
| 738 | progress.UpdateProgress(ctx, woc.wf) |
| 739 | // You MUST not call `persistUpdates` twice. |
| 740 | // * Fails the `reapplyUpdate` cannot work unless resource versions are different. |
| 741 | // * It will double the number of Kubernetes API requests. |
| 742 | if woc.orig.ResourceVersion != woc.wf.ResourceVersion { |
| 743 | woc.log.WithPanic().Error(ctx, "cannot persist updates with mismatched resource versions") |
| 744 | } |
| 745 | wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.Namespace) |
| 746 | |
| 747 | nodes := woc.wf.Status.Nodes |
| 748 | |
| 749 | // try and compress nodes if needed |
| 750 | err := woc.controller.hydrator.Dehydrate(ctx, woc.wf) |
| 751 | if err != nil { |
| 752 | woc.log.WithError(err).Warn(ctx, "Failed to dehydrate") |
| 753 | woc.markWorkflowError(ctx, err) |
| 754 | } |
| 755 | |
| 756 | // Release all acquired lock for completed workflow |
| 757 | if woc.wf.Status.Synchronization != nil && woc.wf.Status.Fulfilled() { |
| 758 | if woc.controller.syncManager.ReleaseAll(ctx, woc.wf) { |
| 759 | woc.log.WithFields(logging.Fields{"key": woc.wf.Name}).Info(ctx, "Released all acquired locks") |
| 760 | } |
| 761 | } |
| 762 | |
| 763 | // Remove completed taskset status before update workflow. |
| 764 | err = woc.removeCompletedTaskSetStatus(ctx, nodes) |
| 765 | if err != nil { |
| 766 | woc.log.WithError(err).Warn(ctx, "error updating taskset") |
| 767 | } |
| 768 | |
| 769 | oldRV := woc.wf.ResourceVersion |
| 770 | woc.updateLastSeenVersionAnnotation(oldRV) |
| 771 | wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) |
| 772 | if err != nil { |
| 773 | woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow") |
| 774 | if argokubeerr.IsRequestEntityTooLargeErr(err) { |
| 775 | woc.persistWorkflowSizeLimitErr(ctx, wfClient, err) |
| 776 | return |
| 777 | } |
| 778 | if !apierr.IsConflict(err) { |
| 779 | return |
| 780 | } |
| 781 | woc.log.Info(ctx, "Re-applying updates on latest version and retrying update") |
| 782 | wf, err := woc.reapplyUpdate(ctx, wfClient, nodes) |
| 783 | if err != nil { |
| 784 | woc.wf.Labels[common.LabelKeyReApplyFailed] = "true" |
| 785 | woc.log.WithError(err).Info(ctx, "Failed to re-apply update") |
| 786 | return |
| 787 | } |
no test coverage detected