MCPcopy
hub / github.com/lni/dragonboat / workerPoolMain

Method workerPoolMain

engine.go:371–465  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

369}
370
371func (p *workerPool) workerPoolMain() {
372 ticker := time.NewTicker(200 * time.Millisecond)
373 defer ticker.Stop()
374 cases := make([]reflect.SelectCase, len(p.workers)+6)
375 for {
376 toSchedule := false
377 // 0 - pool stopper stopc
378 // 1 - p.saveReady.waitCh(1)
379 // 2 - p.recoverReady.waitCh(1)
380 // 3 - p.streamReady.waitCh(1)
381 // 4 - p.cciReady.waitCh(1)
382 // 5 - worker completedC
383 // 5 + len(workers) - ticker.C
384 cases[0] = reflect.SelectCase{
385 Dir: reflect.SelectRecv,
386 Chan: reflect.ValueOf(p.poolStopper.ShouldStop()),
387 }
388 cases[1] = reflect.SelectCase{
389 Dir: reflect.SelectRecv,
390 Chan: reflect.ValueOf(p.saveReady.waitCh(1)),
391 }
392 cases[2] = reflect.SelectCase{
393 Dir: reflect.SelectRecv,
394 Chan: reflect.ValueOf(p.recoverReady.waitCh(1)),
395 }
396 cases[3] = reflect.SelectCase{
397 Dir: reflect.SelectRecv,
398 Chan: reflect.ValueOf(p.streamReady.waitCh(1)),
399 }
400 cases[4] = reflect.SelectCase{
401 Dir: reflect.SelectRecv,
402 Chan: reflect.ValueOf(p.cciReady.waitCh(1)),
403 }
404 for idx, w := range p.workers {
405 cases[5+idx] = reflect.SelectCase{
406 Dir: reflect.SelectRecv,
407 Chan: reflect.ValueOf(w.completedC),
408 }
409 }
410 cases[5+len(p.workers)] = reflect.SelectCase{
411 Dir: reflect.SelectRecv,
412 Chan: reflect.ValueOf(ticker.C),
413 }
414 chosen, _, _ := reflect.Select(cases)
415 if chosen == 0 {
416 p.workerStopper.Stop()
417 p.unloadNodes()
418 return
419 } else if chosen == 1 {
420 clusters := p.saveReady.getReadyMap(1)
421 p.loadNodes()
422 for cid := range clusters {
423 if j, ok := p.getSaveJob(cid); ok {
424 plog.Debugf("%s saveRequested for %d", p.nh.describe(), cid)
425 p.pending = append(p.pending, j)
426 toSchedule = true
427 }
428 }

Callers 1

newWorkerPoolFunction · 0.95

Calls 14

unloadNodesMethod · 0.95
loadNodesMethod · 0.95
getSaveJobMethod · 0.95
getRecoverJobMethod · 0.95
getStreamJobMethod · 0.95
completedMethod · 0.95
scheduleMethod · 0.95
waitChMethod · 0.80
getReadyMapMethod · 0.80
StopMethod · 0.65
ShouldStopMethod · 0.65
DebugfMethod · 0.65

Tested by

no test coverage detected