| 2861 | } |
| 2862 | |
| 2863 | func (woc *wfOperationCtx) initializeNode(ctx context.Context, nodeName string, nodeType wfv1.NodeType, templateScope string, orgTmpl wfv1.TemplateReferenceHolder, boundaryID string, phase wfv1.NodePhase, nodeFlag *wfv1.NodeFlag, omitTaskResultSynced bool, messages ...string) *wfv1.NodeStatus { |
| 2864 | woc.log.WithFields(logging.Fields{"nodeName": nodeName, "template": common.GetTemplateHolderString(orgTmpl), "boundaryID": boundaryID}).Debug(ctx, "Initializing node") |
| 2865 | |
| 2866 | nodeID := woc.wf.NodeID(nodeName) |
| 2867 | ok := woc.wf.Status.Nodes.Has(nodeID) |
| 2868 | if ok { |
| 2869 | panic(fmt.Sprintf("node %s already initialized", nodeName)) |
| 2870 | } |
| 2871 | |
| 2872 | node := wfv1.NodeStatus{ |
| 2873 | ID: nodeID, |
| 2874 | Name: nodeName, |
| 2875 | TemplateName: orgTmpl.GetTemplateName(), |
| 2876 | TemplateRef: orgTmpl.GetTemplateRef(), |
| 2877 | TemplateScope: templateScope, |
| 2878 | Type: nodeType, |
| 2879 | BoundaryID: boundaryID, |
| 2880 | Phase: phase, |
| 2881 | NodeFlag: nodeFlag, |
| 2882 | StartedAt: metav1.Time{Time: time.Now().UTC()}, |
| 2883 | EstimatedDuration: woc.estimateNodeDuration(ctx, nodeName), |
| 2884 | } |
| 2885 | |
| 2886 | if executable(nodeType) && !omitTaskResultSynced { |
| 2887 | tmp := true |
| 2888 | node.TaskResultSynced = &tmp |
| 2889 | } |
| 2890 | |
| 2891 | if boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID); err == nil { |
| 2892 | node.DisplayName = strings.TrimPrefix(node.Name, boundaryNode.Name) |
| 2893 | if stepsOrDagSeparator.MatchString(node.DisplayName) { |
| 2894 | node.DisplayName = stepsOrDagSeparator.ReplaceAllString(node.DisplayName, "") |
| 2895 | } |
| 2896 | } else { |
| 2897 | woc.log.WithField("boundaryID", boundaryID).Info(ctx, "was unable to obtain node, letting display name to be nodeName") |
| 2898 | node.DisplayName = nodeName |
| 2899 | } |
| 2900 | |
| 2901 | if node.Fulfilled() && node.FinishedAt.IsZero() { |
| 2902 | node.FinishedAt = node.StartedAt |
| 2903 | } |
| 2904 | var message string |
| 2905 | if len(messages) > 0 { |
| 2906 | message = fmt.Sprintf(" (message: %s)", messages[0]) |
| 2907 | node.Message = messages[0] |
| 2908 | } |
| 2909 | woc.wf.Status.Nodes.Set(ctx, nodeID, node) |
| 2910 | woc.log.WithFields(logging.Fields{"node": node.ID, "phase": node.Phase, "message": message}).Info(ctx, "node initialized") |
| 2911 | woc.updated = true |
| 2912 | return &node |
| 2913 | } |
| 2914 | |
| 2915 | // Update a node status with cache status |
| 2916 | func (woc *wfOperationCtx) updateAsCacheNode(ctx context.Context, node *wfv1.NodeStatus, memStat *wfv1.MemoizationStatus) { |