podReconciliation is the process by which a workflow will examine all its related pods and update the node state before continuing the evaluation of the workflow. Records all pods which were observed completed, which will be labeled completed=true after successful persist of the workflow. returns wh
(ctx context.Context)
| 1162 | // after successful persist of the workflow. |
| 1163 | // returns whether pod reconciliation successfully completed |
| 1164 | func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) { |
| 1165 | podList, err := woc.getAllWorkflowPods() |
| 1166 | if err != nil { |
| 1167 | woc.log.Error(ctx, "was unable to retrieve workflow pods") |
| 1168 | return err, false |
| 1169 | } |
| 1170 | seenPods := make(map[string]*apiv1.Pod) |
| 1171 | seenPodLock := &sync.Mutex{} |
| 1172 | wfNodesLock := &sync.RWMutex{} |
| 1173 | podRunningCondition := wfv1.Condition{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse} |
| 1174 | taskResultIncomplete := false |
| 1175 | performAssessment := func(pod *apiv1.Pod) { |
| 1176 | if pod == nil { |
| 1177 | return |
| 1178 | } |
| 1179 | if woc.isAgentPod(pod) { |
| 1180 | woc.updateAgentPodStatus(ctx, pod) |
| 1181 | return |
| 1182 | } |
| 1183 | nodeID := woc.nodeID(pod) |
| 1184 | seenPodLock.Lock() |
| 1185 | seenPods[nodeID] = pod |
| 1186 | seenPodLock.Unlock() |
| 1187 | |
| 1188 | wfNodesLock.Lock() |
| 1189 | defer wfNodesLock.Unlock() |
| 1190 | node, err := woc.wf.Status.Nodes.Get(nodeID) |
| 1191 | if err == nil { |
| 1192 | if newState := woc.assessNodeStatus(ctx, pod, node); newState != nil { |
| 1193 | // update if a pod deletion timestamp exists on a completed workflow, ensures this pod is always looked at |
| 1194 | // in the pod cleanup process |
| 1195 | if pod.DeletionTimestamp != nil && newState.Fulfilled() { |
| 1196 | woc.updated = true |
| 1197 | } |
| 1198 | // Check whether its taskresult is in an incompleted state. |
| 1199 | if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) { |
| 1200 | woc.log.WithFields(logging.Fields{"nodeID": newState.ID}).Debug(ctx, "Taskresult of the node not yet completed") |
| 1201 | taskResultIncomplete = true |
| 1202 | return |
| 1203 | } |
| 1204 | woc.addOutputsToGlobalScope(ctx, newState.Outputs) |
| 1205 | if newState.MemoizationStatus != nil { |
| 1206 | if newState.Succeeded() { |
| 1207 | c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, newState.MemoizationStatus.CacheName) |
| 1208 | err := c.Save(ctx, newState.MemoizationStatus.Key, newState.ID, newState.Outputs) |
| 1209 | if err != nil { |
| 1210 | woc.log.WithFields(logging.Fields{"nodeID": newState.ID}).WithError(err).Error(ctx, "Failed to save node outputs to cache") |
| 1211 | newState.Phase = wfv1.NodeError |
| 1212 | newState.Message = err.Error() |
| 1213 | } |
| 1214 | } |
| 1215 | } |
| 1216 | if newState.Phase == wfv1.NodeRunning { |
| 1217 | podRunningCondition.Status = metav1.ConditionTrue |
| 1218 | } |
| 1219 | woc.wf.Status.Nodes.Set(ctx, nodeID, *newState) |
| 1220 | woc.updated = true |
| 1221 | // warning! when the node completes, the daemoned flag will be unset, so we must check the old node |