MCPcopy
hub / github.com/NVIDIA/aistore / WaitForClusterState

Function WaitForClusterState

devtools/tutils/node.go:171–261  ·  view source on GitHub ↗

WaitForClusterState waits until a cluster reaches specified state, meaning: - smap has version larger than origVersion - number of active proxies is equal proxyCnt, unless proxyCnt == 0 - number of active targets is equal targetCnt, unless targetCnt == 0. It returns the smap which satisfies those r

(proxyURL, reason string, origVer int64, pcnt, tcnt int, ignoreIDs ...string)

Source from the content-addressed store, hash-verified

169// It returns the smap which satisfies those requirements.
170// NOTE: Upon successful return from this function cluster state might have already changed.
171func WaitForClusterState(proxyURL, reason string, origVer int64, pcnt, tcnt int, ignoreIDs ...string) (*cluster.Smap, error) {
172 var (
173 lastVersion int64
174 smapChangeDeadline, timeStart, opDeadline time.Time
175
176 expPrx = nodesCnt(pcnt)
177 expTgt = nodesCnt(tcnt)
178
179 baseParams = BaseAPIParams(proxyURL)
180 loopCnt int
181 satisfied bool
182 )
183
184 timeStart = time.Now()
185 smapChangeDeadline = timeStart.Add(2 * proxyChangeLatency)
186 opDeadline = timeStart.Add(3 * proxyChangeLatency)
187
188 tlog.Logf("Waiting for: %q(p%d, t%d, Smap > v%d)\n", reason, expPrx, expTgt, origVer)
189
190 // Repeat until success or timeout.
191 for {
192 smap, err := api.GetClusterMap(baseParams)
193 if err != nil {
194 if !cos.IsRetriableConnErr(err) {
195 return nil, err
196 }
197 tlog.Logf("%v\n", err)
198 goto next
199 }
200 satisfied = expTgt.satisfied(smap.CountActiveTargets()) &&
201 expPrx.satisfied(smap.CountActiveProxies()) &&
202 smap.Version > origVer
203 if !satisfied {
204 if d := time.Since(timeStart); d > 7*time.Second {
205 p := "primary"
206 if smap.Primary.PubNet.URL != proxyURL {
207 p = proxyURL
208 }
209 tlog.Logf("Polling %s[%s] for (t=%d, p=%d, Smap > v%d)\n", p, smap, expTgt, expPrx, origVer)
210 }
211 }
212 if smap.Version != lastVersion {
213 smapChangeDeadline = cos.MinTime(time.Now().Add(proxyChangeLatency), opDeadline)
214 }
215 // if the primary's map changed to the state we want, wait for the map get populated
216 if satisfied {
217 syncedSmap := &cluster.Smap{}
218 cos.CopyStruct(syncedSmap, smap)
219
220 // skip primary proxy and mock targets
221 var proxyID string
222 for _, p := range smap.Pmap {
223 if p.PubNet.URL == proxyURL {
224 proxyID = p.ID()
225 }
226 }
227
228 idsToIgnore := cos.NewStringSet(MockDaemonID, proxyID)

Callers 15

TestConfigSyncToNewNodeFunction · 0.92
killRestorePrimaryFunction · 0.92
killRestoreDiffIPFunction · 0.92
primaryAndTargetCrashFunction · 0.92
proxyCrashFunction · 0.92
primaryAndProxyCrashFunction · 0.92
targetRejoinFunction · 0.92
crashAndFastRestoreFunction · 0.92
joinWhileVoteInProgressFunction · 0.92

Calls 15

AddMethod · 0.95
LogfFunction · 0.92
GetClusterMapFunction · 0.92
IsRetriableConnErrFunction · 0.92
MinTimeFunction · 0.92
CopyStructFunction · 0.92
NewStringSetFunction · 0.92
MinDurationFunction · 0.92
nodesCntTypeAlias · 0.85
BaseAPIParamsFunction · 0.85
_waitMapVersionSyncFunction · 0.85
satisfiedMethod · 0.80

Tested by 15

TestConfigSyncToNewNodeFunction · 0.74
killRestorePrimaryFunction · 0.74
killRestoreDiffIPFunction · 0.74
primaryAndTargetCrashFunction · 0.74
proxyCrashFunction · 0.74
primaryAndProxyCrashFunction · 0.74
targetRejoinFunction · 0.74
crashAndFastRestoreFunction · 0.74
joinWhileVoteInProgressFunction · 0.74