MCPcopy
hub / github.com/CodisLabs/codis / rewatchSentinels

Method rewatchSentinels

pkg/proxy/proxy.go:311–370  ·  view source on GitHub ↗
(servers []string)

Source from the content-addressed store, hash-verified

309}
310
311func (s *Proxy) rewatchSentinels(servers []string) {
312 if s.ha.monitor != nil {
313 s.ha.monitor.Cancel()
314 s.ha.monitor = nil
315 s.ha.masters = nil
316 }
317 if len(servers) != 0 {
318 s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
319 s.ha.monitor.LogFunc = log.Warnf
320 s.ha.monitor.ErrFunc = log.WarnErrorf
321 go func(p *redis.Sentinel) {
322 var trigger = make(chan struct{}, 1)
323 delayUntil := func(deadline time.Time) {
324 for !p.IsCanceled() {
325 var d = deadline.Sub(time.Now())
326 if d <= 0 {
327 return
328 }
329 time.Sleep(math2.MinDuration(d, time.Second))
330 }
331 }
332 go func() {
333 defer close(trigger)
334 callback := func() {
335 select {
336 case trigger <- struct{}{}:
337 default:
338 }
339 }
340 for !p.IsCanceled() {
341 timeout := time.Minute * 15
342 retryAt := time.Now().Add(time.Second * 10)
343 if !p.Subscribe(servers, timeout, callback) {
344 delayUntil(retryAt)
345 } else {
346 callback()
347 }
348 }
349 }()
350 go func() {
351 for range trigger {
352 var success int
353 for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ {
354 timeout := time.Second * 5
355 masters, err := p.Masters(servers, timeout)
356 if err != nil {
357 log.WarnErrorf(err, "[%p] fetch group masters failed", s)
358 } else {
359 if !p.IsCanceled() {
360 s.SwitchMasters(masters)
361 }
362 success += 1
363 }
364 delayUntil(time.Now().Add(time.Second * 5))
365 }
366 }
367 }()
368 }(s.ha.monitor)

Callers 2

SetSentinelsMethod · 0.95
RewatchSentinelsMethod · 0.95

Calls 8

SwitchMastersMethod · 0.95
IsCanceledMethod · 0.80
SubMethod · 0.80
SubscribeMethod · 0.80
MastersMethod · 0.80
WarnErrorfMethod · 0.80
SleepMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected