list all running workflows to initialize throttler and syncManager
(ctx context.Context)
| 452 | |
| 453 | // list all running workflows to initialize throttler and syncManager |
| 454 | func (wfc *WorkflowController) initManagers(ctx context.Context) error { |
| 455 | labelSelector := labels.NewSelector().Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) |
| 456 | req, _ := labels.NewRequirement(common.LabelKeyPhase, selection.Equals, []string{string(wfv1.WorkflowRunning)}) |
| 457 | if req != nil { |
| 458 | labelSelector = labelSelector.Add(*req) |
| 459 | } |
| 460 | listOpts := metav1.ListOptions{LabelSelector: labelSelector.String()} |
| 461 | wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.GetManagedNamespace()).List(ctx, listOpts) |
| 462 | if err != nil { |
| 463 | return err |
| 464 | } |
| 465 | |
| 466 | // A non-nil error means a recorded lock holder could not be re-established |
| 467 | // (undecodable lock name, or an unavailable database session). This is fatal |
| 468 | // by design: we fail closed rather than risk a silent double-acquire. |
| 469 | staleHolds, err := wfc.syncManager.Initialize(ctx, wfList.Items) |
| 470 | if err != nil { |
| 471 | return err |
| 472 | } |
| 473 | // Stale holds are workflows whose recorded hold on a database-backed lock |
| 474 | // the database no longer has (e.g. expired while the controller was down, |
| 475 | // possibly acquired by someone else since). The database is the source of |
| 476 | // truth, so these workflows must not keep running on a hold it does not |
| 477 | // back: fail them; persistUpdates releases any locks they still hold. |
| 478 | for _, stale := range staleHolds { |
| 479 | woc := newWorkflowOperationCtx(ctx, stale.WF, wfc) |
| 480 | wocCtx := logging.WithLogger(ctx, woc.log) |
| 481 | woc.markWorkflowFailed(wocCtx, fmt.Sprintf("Failed to re-establish synchronization lock at controller startup: %s", stale.Reason)) |
| 482 | woc.persistUpdates(wocCtx) |
| 483 | } |
| 484 | |
| 485 | if err := wfc.throttler.Init(wfList.Items); err != nil { |
| 486 | return err |
| 487 | } |
| 488 | |
| 489 | return nil |
| 490 | } |
| 491 | |
| 492 | func (wfc *WorkflowController) runConfigMapWatcher(ctx context.Context) { |
| 493 | defer runtimeutil.HandleCrashWithContext(ctx, runtimeutil.PanicHandlers...) |