MCPcopy
hub / github.com/argoproj/argo-workflows / podReconciliation

Method podReconciliation

workflow/controller/operator.go:1164–1297  ·  view source on GitHub ↗

podReconciliation is the process by which a workflow will examine all its related pods and update the node state before continuing the evaluation of the workflow. Records all pods which were observed completed, which will be labeled completed=true after successful persist of the workflow. returns wh

(ctx context.Context)

Source from the content-addressed store, hash-verified

1162// after successful persist of the workflow.
1163// returns whether pod reconciliation successfully completed
1164func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) {
1165 podList, err := woc.getAllWorkflowPods()
1166 if err != nil {
1167 woc.log.Error(ctx, "was unable to retrieve workflow pods")
1168 return err, false
1169 }
1170 seenPods := make(map[string]*apiv1.Pod)
1171 seenPodLock := &sync.Mutex{}
1172 wfNodesLock := &sync.RWMutex{}
1173 podRunningCondition := wfv1.Condition{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse}
1174 taskResultIncomplete := false
1175 performAssessment := func(pod *apiv1.Pod) {
1176 if pod == nil {
1177 return
1178 }
1179 if woc.isAgentPod(pod) {
1180 woc.updateAgentPodStatus(ctx, pod)
1181 return
1182 }
1183 nodeID := woc.nodeID(pod)
1184 seenPodLock.Lock()
1185 seenPods[nodeID] = pod
1186 seenPodLock.Unlock()
1187
1188 wfNodesLock.Lock()
1189 defer wfNodesLock.Unlock()
1190 node, err := woc.wf.Status.Nodes.Get(nodeID)
1191 if err == nil {
1192 if newState := woc.assessNodeStatus(ctx, pod, node); newState != nil {
1193 // update if a pod deletion timestamp exists on a completed workflow, ensures this pod is always looked at
1194 // in the pod cleanup process
1195 if pod.DeletionTimestamp != nil && newState.Fulfilled() {
1196 woc.updated = true
1197 }
1198 // Check whether its taskresult is in an incompleted state.
1199 if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) {
1200 woc.log.WithFields(logging.Fields{"nodeID": newState.ID}).Debug(ctx, "Taskresult of the node not yet completed")
1201 taskResultIncomplete = true
1202 return
1203 }
1204 woc.addOutputsToGlobalScope(ctx, newState.Outputs)
1205 if newState.MemoizationStatus != nil {
1206 if newState.Succeeded() {
1207 c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, newState.MemoizationStatus.CacheName)
1208 err := c.Save(ctx, newState.MemoizationStatus.Key, newState.ID, newState.Outputs)
1209 if err != nil {
1210 woc.log.WithFields(logging.Fields{"nodeID": newState.ID}).WithError(err).Error(ctx, "Failed to save node outputs to cache")
1211 newState.Phase = wfv1.NodeError
1212 newState.Message = err.Error()
1213 }
1214 }
1215 }
1216 if newState.Phase == wfv1.NodeRunning {
1217 podRunningCondition.Status = metav1.ConditionTrue
1218 }
1219 woc.wf.Status.Nodes.Set(ctx, nodeID, *newState)
1220 woc.updated = true
1221 // warning! when the node completes, the daemoned flag will be unset, so we must check the old node

Calls 15

getAllWorkflowPodsMethod · 0.95
isAgentPodMethod · 0.95
updateAgentPodStatusMethod · 0.95
nodeIDMethod · 0.95
assessNodeStatusMethod · 0.95
shouldPrintPodSpecMethod · 0.95
printPodSpecLogMethod · 0.95
applyExecutionControlMethod · 0.95
requeueMethod · 0.95
markNodeErrorMethod · 0.95