| 135 | } |
| 136 | |
| 137 | func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error { |
| 138 | workflowTaskSet, err := woc.getWorkflowTaskSet() |
| 139 | if err != nil { |
| 140 | return err |
| 141 | } |
| 142 | |
| 143 | woc.log.Info(ctx, "TaskSet Reconciliation") |
| 144 | if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 { |
| 145 | for nodeID, taskResult := range workflowTaskSet.Status.Nodes { |
| 146 | node, err := woc.wf.Status.Nodes.Get(nodeID) |
| 147 | if err != nil { |
| 148 | woc.log.Warn(ctx, "returning but assumed validity before") |
| 149 | woc.log.WithField("nodeID", nodeID).Error(ctx, "was unable to obtain node for nodeID") |
| 150 | return err |
| 151 | } |
| 152 | |
| 153 | node.Outputs = taskResult.Outputs.DeepCopy() |
| 154 | node.Phase = taskResult.Phase |
| 155 | node.Message = taskResult.Message |
| 156 | node.FinishedAt = metav1.Now() |
| 157 | |
| 158 | woc.wf.Status.Nodes.Set(ctx, nodeID, *node) |
| 159 | if node.MemoizationStatus != nil && node.Succeeded() { |
| 160 | c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName) |
| 161 | err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs) |
| 162 | if err != nil { |
| 163 | woc.log.WithFields(logging.Fields{"nodeID": node.ID}).WithError(err).Error(ctx, "Failed to save node outputs to cache") |
| 164 | } |
| 165 | } |
| 166 | woc.updated = true |
| 167 | } |
| 168 | } |
| 169 | return woc.createTaskSet(ctx) |
| 170 | } |
| 171 | |
| 172 | func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { |
| 173 | if len(woc.taskSet) == 0 { |