MCPcopy
hub / github.com/argoproj/argo-workflows / initManagers

Method initManagers

workflow/controller/controller.go:454–490  ·  view source on GitHub ↗

list all running workflows to initialize throttler and syncManager

(ctx context.Context)

Source from the content-addressed store, hash-verified

452
453// list all running workflows to initialize throttler and syncManager
454func (wfc *WorkflowController) initManagers(ctx context.Context) error {
455 labelSelector := labels.NewSelector().Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
456 req, _ := labels.NewRequirement(common.LabelKeyPhase, selection.Equals, []string{string(wfv1.WorkflowRunning)})
457 if req != nil {
458 labelSelector = labelSelector.Add(*req)
459 }
460 listOpts := metav1.ListOptions{LabelSelector: labelSelector.String()}
461 wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.GetManagedNamespace()).List(ctx, listOpts)
462 if err != nil {
463 return err
464 }
465
466 // A non-nil error means a recorded lock holder could not be re-established
467 // (undecodable lock name, or an unavailable database session). This is fatal
468 // by design: we fail closed rather than risk a silent double-acquire.
469 staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items)
470 if err != nil {
471 return err
472 }
473 // Stale holds are workflows whose recorded hold on a database-backed lock
474 // the database no longer has (e.g. expired while the controller was down,
475 // possibly acquired by someone else since). The database is the source of
476 // truth, so these workflows must not keep running on a hold it does not
477 // back: fail them; persistUpdates releases any locks they still hold.
478 for _, stale := range staleHolds {
479 woc := newWorkflowOperationCtx(ctx, stale.WF, wfc)
480 wocCtx := logging.WithLogger(ctx, woc.log)
481 woc.markWorkflowFailed(wocCtx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason))
482 woc.persistUpdates(wocCtx)
483 }
484
485 if err := wfc.throttler.Init(wfList.Items); err != nil {
486 return err
487 }
488
489 return nil
490}
491
492func (wfc *WorkflowController) runConfigMapWatcher(ctx context.Context) {
493 defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...)

Callers 2

RunMethod · 0.95
newControllerFunction · 0.95

Calls 13

GetManagedNamespaceMethod · 0.95
InstanceIDRequirementFunction · 0.92
WithLoggerFunction · 0.92
newWorkflowOperationCtxFunction · 0.85
InitializeMethod · 0.80
markWorkflowFailedMethod · 0.80
persistUpdatesMethod · 0.80
AddMethod · 0.65
ListMethod · 0.65
WorkflowsMethod · 0.65
ArgoprojV1alpha1Method · 0.65
InitMethod · 0.65

Tested by 1

newControllerFunction · 0.76