MCPcopy
hub / github.com/vulcand/vulcand / Subscribe

Method Subscribe

engine/etcdng/v3/etcd.go:625–679  ·  view source on GitHub ↗

Subscribe watches etcd changes and generates structured events telling vulcand to add or delete frontends, hosts etc. It is a blocking function.

(changes chan interface{}, afterIdx uint64, cancelC chan struct{})

Source from the content-addressed store, hash-verified

623// Subscribe watches etcd changes and generates structured events telling vulcand to add or delete frontends, hosts etc.
624// It is a blocking function.
625func (n *ng) Subscribe(changes chan interface{}, afterIdx uint64, cancelC chan struct{}) error {
626 watcher := etcd.NewWatcher(n.client)
627 defer watcher.Close()
628
629 var watchChan etcd.WatchChan
630 ready := make(chan struct{})
631 go func() {
632 watchChan = watcher.Watch(etcd.WithRequireLeader(n.context), n.etcdKey,
633 etcd.WithRev(int64(afterIdx)), etcd.WithPrefix())
634 close(ready)
635 }()
636 select {
637 case <-ready:
638 log.Infof("begin watching: etcd revision %d", afterIdx)
639 case <-time.After(time.Second * 10):
640 return errors.New("timed out while waiting for watcher.Watch() to start")
641 }
642
643 for response := range watchChan {
644 if response.Canceled {
645 log.Warn("etcd watcher cancelled")
646
647 if err := response.Err(); err != nil {
648 log.Errorf("etcd watcher cancelled with error: %v", err)
649 return err
650 }
651
652 return errors.New("etcd watcher failed without error message (canceled)")
653 }
654
655 if err := response.Err(); err != nil {
656 log.Errorf("etcd watcher received error: %v", err)
657 return err
658 }
659
660 for _, event := range response.Events {
661 log.WithFields(eventToFields(event)).Infof("%s: %s", event.Type, event.Kv.Key)
662 change, err := n.parseChange(event)
663 if err != nil {
664 log.WithFields(eventToFields(event)).Warningf("ignoring event; error: %s", err)
665 continue
666 }
667 if change != nil {
668 log.Infof("%v", change)
669 select {
670 case changes <- change:
671 case <-cancelC:
672 return nil
673 }
674 }
675 }
676 }
677
678 return errors.New("etcd watcher channel closed without graceful stop")
679}
680
681type MatcherFn func(*etcd.Event) (interface{}, error)
682

Callers

nothing calls this directly

Calls 4

parseChangeMethod · 0.95
eventToFieldsFunction · 0.85
InfofMethod · 0.80
CloseMethod · 0.65

Tested by

no test coverage detected