| 177 | } |
| 178 | |
| 179 | func (s *Topom) Start(routines bool) error { |
| 180 | s.mu.Lock() |
| 181 | defer s.mu.Unlock() |
| 182 | if s.closed { |
| 183 | return ErrClosedTopom |
| 184 | } |
| 185 | if s.online { |
| 186 | return nil |
| 187 | } else { |
| 188 | if err := s.store.Acquire(s.model); err != nil { |
| 189 | log.ErrorErrorf(err, "store: acquire lock of %s failed", s.config.ProductName) |
| 190 | return errors.Errorf("store: acquire lock of %s failed", s.config.ProductName) |
| 191 | } |
| 192 | s.online = true |
| 193 | } |
| 194 | |
| 195 | if !routines { |
| 196 | return nil |
| 197 | } |
| 198 | ctx, err := s.newContext() |
| 199 | if err != nil { |
| 200 | return err |
| 201 | } |
| 202 | s.rewatchSentinels(ctx.sentinel.Servers) |
| 203 | |
| 204 | go func() { |
| 205 | for !s.IsClosed() { |
| 206 | if s.IsOnline() { |
| 207 | w, _ := s.RefreshRedisStats(time.Second) |
| 208 | if w != nil { |
| 209 | w.Wait() |
| 210 | } |
| 211 | } |
| 212 | time.Sleep(time.Second) |
| 213 | } |
| 214 | }() |
| 215 | |
| 216 | go func() { |
| 217 | for !s.IsClosed() { |
| 218 | if s.IsOnline() { |
| 219 | w, _ := s.RefreshProxyStats(time.Second) |
| 220 | if w != nil { |
| 221 | w.Wait() |
| 222 | } |
| 223 | } |
| 224 | time.Sleep(time.Second) |
| 225 | } |
| 226 | }() |
| 227 | |
| 228 | go func() { |
| 229 | for !s.IsClosed() { |
| 230 | if s.IsOnline() { |
| 231 | if err := s.ProcessSlotAction(); err != nil { |
| 232 | log.WarnErrorf(err, "process slot action failed") |
| 233 | time.Sleep(time.Second * 5) |
| 234 | } |
| 235 | } |
| 236 | time.Sleep(time.Second) |