MCPcopy
hub / github.com/argoproj/argo-workflows / reconcileTaskSet

Method reconcileTaskSet

workflow/controller/taskset.go:137–170  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

135}
136
137func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error {
138 workflowTaskSet, err := woc.getWorkflowTaskSet()
139 if err != nil {
140 return err
141 }
142
143 woc.log.Info(ctx, "TaskSet Reconciliation")
144 if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 {
145 for nodeID, taskResult := range workflowTaskSet.Status.Nodes {
146 node, err := woc.wf.Status.Nodes.Get(nodeID)
147 if err != nil {
148 woc.log.Warn(ctx, "returning but assumed validity before")
149 woc.log.WithField("nodeID", nodeID).Error(ctx, "was unable to obtain node for nodeID")
150 return err
151 }
152
153 node.Outputs = taskResult.Outputs.DeepCopy()
154 node.Phase = taskResult.Phase
155 node.Message = taskResult.Message
156 node.FinishedAt = metav1.Now()
157
158 woc.wf.Status.Nodes.Set(ctx, nodeID, *node)
159 if node.MemoizationStatus != nil && node.Succeeded() {
160 c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
161 err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
162 if err != nil {
163 woc.log.WithFields(logging.Fields{"nodeID": node.ID}).WithError(err).Error(ctx, "Failed to save node outputs to cache")
164 }
165 }
166 woc.updated = true
167 }
168 }
169 return woc.createTaskSet(ctx)
170}
171
172func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
173 if len(woc.taskSet) == 0 {

Calls 14

getWorkflowTaskSetMethod · 0.95
createTaskSetMethod · 0.95
SucceededMethod · 0.80
InfoMethod · 0.65
GetMethod · 0.65
WarnMethod · 0.65
ErrorMethod · 0.65
WithFieldMethod · 0.65
GetCacheMethod · 0.65
SaveMethod · 0.65
WithErrorMethod · 0.65
WithFieldsMethod · 0.65

Tested by 2