| 15 | ) |
| 16 | |
| 17 | func (s *Topom) ProcessSlotAction() error { |
| 18 | for s.IsOnline() { |
| 19 | var ( |
| 20 | marks = make(map[int]bool) |
| 21 | plans = make(map[int]bool) |
| 22 | ) |
| 23 | var accept = func(m *models.SlotMapping) bool { |
| 24 | if marks[m.GroupId] || marks[m.Action.TargetId] { |
| 25 | return false |
| 26 | } |
| 27 | if plans[m.Id] { |
| 28 | return false |
| 29 | } |
| 30 | return true |
| 31 | } |
| 32 | var update = func(m *models.SlotMapping) bool { |
| 33 | if m.GroupId != 0 { |
| 34 | marks[m.GroupId] = true |
| 35 | } |
| 36 | marks[m.Action.TargetId] = true |
| 37 | plans[m.Id] = true |
| 38 | return true |
| 39 | } |
| 40 | var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots) |
| 41 | for parallel > len(plans) { |
| 42 | _, ok, err := s.SlotActionPrepareFilter(accept, update) |
| 43 | if err != nil { |
| 44 | return err |
| 45 | } else if !ok { |
| 46 | break |
| 47 | } |
| 48 | } |
| 49 | if len(plans) == 0 { |
| 50 | return nil |
| 51 | } |
| 52 | var fut sync2.Future |
| 53 | for sid, _ := range plans { |
| 54 | fut.Add() |
| 55 | go func(sid int) { |
| 56 | log.Warnf("slot-[%d] process action", sid) |
| 57 | var err = s.processSlotAction(sid) |
| 58 | if err != nil { |
| 59 | status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err) |
| 60 | s.action.progress.status.Store(status) |
| 61 | } else { |
| 62 | s.action.progress.status.Store("") |
| 63 | } |
| 64 | fut.Done(strconv.Itoa(sid), err) |
| 65 | }(sid) |
| 66 | } |
| 67 | for _, v := range fut.Wait() { |
| 68 | if v != nil { |
| 69 | return v.(error) |
| 70 | } |
| 71 | } |
| 72 | time.Sleep(time.Millisecond * 10) |
| 73 | } |
| 74 | return nil |