MCPcopy
hub / github.com/fluid-cloudnative/fluid / SetupWorkers

Method SetupWorkers

pkg/ctrl/worker.go:109–160  ·  view source on GitHub ↗

SetupWorkers checks the desired and current replicas of workers and makes an update over the status by setting phases and conditions. The function calls for a status update and finally returns error if anything unexpected happens.

(runtime base.RuntimeInterface,
	currentStatus datav1alpha1.RuntimeStatus,
	workers *appsv1.StatefulSet)

Source from the content-addressed store, hash-verified

107// over the status by setting phases and conditions. The function
108// calls for a status update and finally returns error if anything unexpected happens.
109func (e *Helper) SetupWorkers(runtime base.RuntimeInterface,
110 currentStatus datav1alpha1.RuntimeStatus,
111 workers *appsv1.StatefulSet) (err error) {
112
113 var (
114 desireReplicas int32 = runtime.Replicas()
115 actualReplicas int32 = 0
116 )
117
118 if workers.Spec.Replicas != nil {
119 actualReplicas = *workers.Spec.Replicas
120 }
121
122 if actualReplicas != desireReplicas {
123 // workerToUpdate, err := e.buildWorkersAffinity(workers)
124
125 workerToUpdate, err := e.BuildWorkersAffinity(workers)
126 if err != nil {
127 return err
128 }
129
130 workerToUpdate.Spec.Replicas = &desireReplicas
131 err = e.client.Update(context.TODO(), workerToUpdate)
132 if err != nil {
133 return err
134 }
135
136 workers = workerToUpdate
137 } else {
138 e.log.V(1).Info("Nothing to do for syncing")
139 }
140
141 if *workers.Spec.Replicas != runtime.GetStatus().DesiredWorkerNumberScheduled {
142 // DO NOT DeepCopy here because the status might be updated somewhere else
143 statusToUpdate := runtime.GetStatus()
144
145 cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeWorkersInitialized, datav1alpha1.RuntimeWorkersInitializedReason,
146 "The workers are initialized.", corev1.ConditionTrue)
147 statusToUpdate.Conditions =
148 utils.UpdateRuntimeCondition(statusToUpdate.Conditions,
149 cond)
150
151 status := *statusToUpdate
152 if !reflect.DeepEqual(status, currentStatus) {
153 e.log.V(1).Info("Update runtime status", "runtime", fmt.Sprintf("%s/%s", runtime.GetNamespace(), runtime.GetName()))
154 return e.client.Status().Update(context.TODO(), runtime)
155 }
156 }
157
158 return
159
160}
161
162// CheckAndSyncWorkerStatus checks the worker statefulset's status and update it to runtime's status accordingly.
163// It returns readyOrPartialReady to indicate if the worker statefulset is (partial) ready or not ready.

Callers 1

SyncReplicasMethod · 0.95

Calls 8

BuildWorkersAffinityMethod · 0.95
NewRuntimeConditionFunction · 0.92
UpdateRuntimeConditionFunction · 0.92
UpdateMethod · 0.80
ReplicasMethod · 0.65
GetStatusMethod · 0.65
GetNamespaceMethod · 0.65
GetNameMethod · 0.65

Tested by

no test coverage detected