()
| 369 | } |
| 370 | |
| 371 | func (p *workerPool) workerPoolMain() { |
| 372 | ticker := time.NewTicker(200 * time.Millisecond) |
| 373 | defer ticker.Stop() |
| 374 | cases := make([]reflect.SelectCase, len(p.workers)+6) |
| 375 | for { |
| 376 | toSchedule := false |
| 377 | // 0 - pool stopper stopc |
| 378 | // 1 - p.saveReady.waitCh(1) |
| 379 | // 2 - p.recoverReady.waitCh(1) |
| 380 | // 3 - p.streamReady.waitCh(1) |
| 381 | // 4 - p.cciReady.waitCh(1) |
| 382 | // 5 - worker completedC |
| 383 | // 5 + len(workers) - ticker.C |
| 384 | cases[0] = reflect.SelectCase{ |
| 385 | Dir: reflect.SelectRecv, |
| 386 | Chan: reflect.ValueOf(p.poolStopper.ShouldStop()), |
| 387 | } |
| 388 | cases[1] = reflect.SelectCase{ |
| 389 | Dir: reflect.SelectRecv, |
| 390 | Chan: reflect.ValueOf(p.saveReady.waitCh(1)), |
| 391 | } |
| 392 | cases[2] = reflect.SelectCase{ |
| 393 | Dir: reflect.SelectRecv, |
| 394 | Chan: reflect.ValueOf(p.recoverReady.waitCh(1)), |
| 395 | } |
| 396 | cases[3] = reflect.SelectCase{ |
| 397 | Dir: reflect.SelectRecv, |
| 398 | Chan: reflect.ValueOf(p.streamReady.waitCh(1)), |
| 399 | } |
| 400 | cases[4] = reflect.SelectCase{ |
| 401 | Dir: reflect.SelectRecv, |
| 402 | Chan: reflect.ValueOf(p.cciReady.waitCh(1)), |
| 403 | } |
| 404 | for idx, w := range p.workers { |
| 405 | cases[5+idx] = reflect.SelectCase{ |
| 406 | Dir: reflect.SelectRecv, |
| 407 | Chan: reflect.ValueOf(w.completedC), |
| 408 | } |
| 409 | } |
| 410 | cases[5+len(p.workers)] = reflect.SelectCase{ |
| 411 | Dir: reflect.SelectRecv, |
| 412 | Chan: reflect.ValueOf(ticker.C), |
| 413 | } |
| 414 | chosen, _, _ := reflect.Select(cases) |
| 415 | if chosen == 0 { |
| 416 | p.workerStopper.Stop() |
| 417 | p.unloadNodes() |
| 418 | return |
| 419 | } else if chosen == 1 { |
| 420 | clusters := p.saveReady.getReadyMap(1) |
| 421 | p.loadNodes() |
| 422 | for cid := range clusters { |
| 423 | if j, ok := p.getSaveJob(cid); ok { |
| 424 | plog.Debugf("%s saveRequested for %d", p.nh.describe(), cid) |
| 425 | p.pending = append(p.pending, j) |
| 426 | toSchedule = true |
| 427 | } |
| 428 | } |
no test coverage detected