MCPcopy
hub / github.com/CodisLabs/codis / ProcessSlotAction

Method ProcessSlotAction

pkg/topom/topom_action.go:17–75  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

15)
16
17func (s *Topom) ProcessSlotAction() error {
18 for s.IsOnline() {
19 var (
20 marks = make(map[int]bool)
21 plans = make(map[int]bool)
22 )
23 var accept = func(m *models.SlotMapping) bool {
24 if marks[m.GroupId] || marks[m.Action.TargetId] {
25 return false
26 }
27 if plans[m.Id] {
28 return false
29 }
30 return true
31 }
32 var update = func(m *models.SlotMapping) bool {
33 if m.GroupId != 0 {
34 marks[m.GroupId] = true
35 }
36 marks[m.Action.TargetId] = true
37 plans[m.Id] = true
38 return true
39 }
40 var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
41 for parallel > len(plans) {
42 _, ok, err := s.SlotActionPrepareFilter(accept, update)
43 if err != nil {
44 return err
45 } else if !ok {
46 break
47 }
48 }
49 if len(plans) == 0 {
50 return nil
51 }
52 var fut sync2.Future
53 for sid, _ := range plans {
54 fut.Add()
55 go func(sid int) {
56 log.Warnf("slot-[%d] process action", sid)
57 var err = s.processSlotAction(sid)
58 if err != nil {
59 status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
60 s.action.progress.status.Store(status)
61 } else {
62 s.action.progress.status.Store("")
63 }
64 fut.Done(strconv.Itoa(sid), err)
65 }(sid)
66 }
67 for _, v := range fut.Wait() {
68 if v != nil {
69 return v.(error)
70 }
71 }
72 time.Sleep(time.Millisecond * 10)
73 }
74 return nil

Callers 2

StartMethod · 0.95
TestSlotActionFunction · 0.80

Calls 8

IsOnlineMethod · 0.95
AddMethod · 0.95
processSlotActionMethod · 0.95
DoneMethod · 0.95
WaitMethod · 0.95
WarnfMethod · 0.80
SleepMethod · 0.65

Tested by 1

TestSlotActionFunction · 0.64