MCPcopy
hub / github.com/CodisLabs/codis / Subscribe

Method Subscribe

pkg/utils/redis/sentinel.go:175–219  ·  view source on GitHub ↗
(sentinels []string, timeout time.Duration, onMajoritySubscribed func())

Source from the content-addressed store, hash-verified

173}
174
175func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajoritySubscribed func()) bool {
176 cntx, cancel := context.WithTimeout(s.Context, timeout)
177 defer cancel()
178
179 timeout += time.Second * 5
180 results := make(chan bool, len(sentinels))
181
182 var majority = 1 + len(sentinels)/2
183
184 var subscribed atomic2.Int64
185 for i := range sentinels {
186 go func(sentinel string) {
187 notified, err := s.subscribeDispatch(cntx, sentinel, timeout, func() {
188 if subscribed.Incr() == int64(majority) {
189 onMajoritySubscribed()
190 }
191 })
192 if err != nil {
193 s.errorf(err, "sentinel-[%s] subscribe failed", sentinel)
194 }
195 results <- notified
196 }(sentinels[i])
197 }
198
199 for alive := len(sentinels); ; alive-- {
200 if alive < majority {
201 if cntx.Err() == nil {
202 s.printf("sentinel subscribe lost majority (%d/%d)", alive, len(sentinels))
203 }
204 return false
205 }
206 select {
207 case <-cntx.Done():
208 if cntx.Err() != context.DeadlineExceeded {
209 s.printf("sentinel subscribe canceled (%v)", cntx.Err())
210 }
211 return false
212 case notified := <-results:
213 if notified {
214 s.printf("sentinel subscribe notified +switch-master")
215 return true
216 }
217 }
218 }
219}
220
221func (s *Sentinel) existsCommand(client *Client, names []string) (map[string]bool, error) {
222 go func() {

Callers 2

rewatchSentinelsMethod · 0.80
rewatchSentinelsMethod · 0.80

Calls 5

subscribeDispatchMethod · 0.95
IncrMethod · 0.95
errorfMethod · 0.95
printfMethod · 0.95
DoneMethod · 0.80

Tested by

no test coverage detected