markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps
(ctx context.Context, nodeName string, phase wfv1.NodePhase, message ...string)
| 2932 | |
| 2933 | // markNodePhase marks a node with the given phase, creating the node if necessary and handles timestamps |
| 2934 | func (woc *wfOperationCtx) markNodePhase(ctx context.Context, nodeName string, phase wfv1.NodePhase, message ...string) *wfv1.NodeStatus { |
| 2935 | node, err := woc.wf.GetNodeByName(nodeName) |
| 2936 | if err != nil { |
| 2937 | woc.log.WithFields(logging.Fields{"workflowName": woc.wf.Name, "nodeName": nodeName, "phase": phase, "message": message}).Warn(ctx, "workflow node uninitialized when marking new phase") |
| 2938 | node = &wfv1.NodeStatus{} |
| 2939 | } |
| 2940 | // if we not in a running state (not expecting task results) |
| 2941 | // and transition into a state that ensures we will never run mark the task results synced |
| 2942 | if node.Phase != wfv1.NodeRunning && phase.FailedOrError() && node.TaskResultSynced != nil { |
| 2943 | tmp := true |
| 2944 | node.TaskResultSynced = &tmp |
| 2945 | } |
| 2946 | if node.Phase != phase { |
| 2947 | if node.Phase.Fulfilled(node.TaskResultSynced) { |
| 2948 | woc.log.WithFields(logging.Fields{"nodeName": node.Name, "fromPhase": node.Phase, "toPhase": phase}). |
| 2949 | Error(ctx, "node is already fulfilled") |
| 2950 | } |
| 2951 | woc.log.WithFields(logging.Fields{"node": node.ID, "fromPhase": node.Phase, "toPhase": phase}).Info(ctx, "node phase changed") |
| 2952 | node.Phase = phase |
| 2953 | woc.updated = true |
| 2954 | } |
| 2955 | if len(message) > 0 { |
| 2956 | if message[0] != node.Message { |
| 2957 | woc.log.WithFields(logging.Fields{"node": node.ID, "message": message[0]}).Info(ctx, "node message changed") |
| 2958 | node.Message = message[0] |
| 2959 | woc.updated = true |
| 2960 | } |
| 2961 | } |
| 2962 | if node.Fulfilled() && node.FinishedAt.IsZero() { |
| 2963 | node.FinishedAt = metav1.Time{Time: time.Now().UTC()} |
| 2964 | woc.log.WithFields(logging.Fields{"node": node.ID, "finishedAt": node.FinishedAt}).Info(ctx, "node finished") |
| 2965 | woc.updated = true |
| 2966 | } |
| 2967 | woc.wf.Status.Nodes.Set(ctx, node.ID, *node) |
| 2968 | return node |
| 2969 | } |
| 2970 | |
| 2971 | func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, error) { |
| 2972 | if node.Type != wfv1.NodeTypePod { |