MCPcopy
hub / github.com/argoproj/argo-workflows / markNodePhase

Method markNodePhase

workflow/controller/operator.go:2934–2969  ·  view source on GitHub ↗

markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps

(ctx context.Context, nodeName string, phase wfv1.NodePhase, message ...string)

Source from the content-addressed store, hash-verified

2932
2933// markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps
2934func (woc *wfOperationCtx) markNodePhase(ctx context.Context, nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus {
2935 node, err := woc.wf.GetNodeByName(nodeName)
2936 if err != nil {
2937 woc.log.WithFields(logging.Fields{"workflowName": woc.wf.Name, "nodeName": nodeName, "phase": phase, "message": message}).Warn(ctx, "workflow node uninitialized when marking new phase")
2938 node = &wfv1.NodeStatus{}
2939 }
2940 // if we not in a running state (not expecting task results)
2941 // and transition into a state that ensures we will never run mark the task results synced
2942 if node.Phase != wfv1.NodeRunning && phase.FailedOrError() && node.TaskResultSynced != nil {
2943 tmp := true
2944 node.TaskResultSynced = &tmp
2945 }
2946 if node.Phase != phase {
2947 if node.Phase.Fulfilled(node.TaskResultSynced) {
2948 woc.log.WithFields(logging.Fields{"nodeName": node.Name, "fromPhase": node.Phase, "toPhase": phase}).
2949 Error(ctx, "node is already fulfilled")
2950 }
2951 woc.log.WithFields(logging.Fields{"node": node.ID, "fromPhase": node.Phase, "toPhase": phase}).Info(ctx, "node phase changed")
2952 node.Phase = phase
2953 woc.updated = true
2954 }
2955 if len(message) > 0 {
2956 if message[0] != node.Message {
2957 woc.log.WithFields(logging.Fields{"node": node.ID, "message": message[0]}).Info(ctx, "node message changed")
2958 node.Message = message[0]
2959 woc.updated = true
2960 }
2961 }
2962 if node.Fulfilled() && node.FinishedAt.IsZero() {
2963 node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
2964 woc.log.WithFields(logging.Fields{"node": node.ID, "finishedAt": node.FinishedAt}).Info(ctx, "node finished")
2965 woc.updated = true
2966 }
2967 woc.wf.Status.Nodes.Set(ctx, node.ID, *node)
2968 return node
2969}
2970
2971func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, error) {
2972 if node.Type != wfv1.NodeTypePod {

Callers 15

executeDAGMethod · 0.95
executeDAGTaskMethod · 0.95
processNodeRetriesMethod · 0.95
assessNodeStatusMethod · 0.95
executeTemplateMethod · 0.95
markNodeErrorMethod · 0.95
markNodePendingMethod · 0.95
checkParallelismMethod · 0.95
executeSuspendMethod · 0.95
createWorkflowPodMethod · 0.95

Calls 9

GetNodeByNameMethod · 0.80
IsZeroMethod · 0.80
WarnMethod · 0.65
WithFieldsMethod · 0.65
ErrorMethod · 0.65
InfoMethod · 0.65
FailedOrErrorMethod · 0.45
FulfilledMethod · 0.45
SetMethod · 0.45