Subscribe watches etcd changes and generates structured events telling vulcand to add or delete frontends, hosts etc. It is a blocking function.
(changes chan interface{}, afterIdx uint64, cancelC chan struct{})
| 623 | // Subscribe watches etcd changes and generates structured events telling vulcand to add or delete frontends, hosts etc. |
| 624 | // It is a blocking function. |
| 625 | func (n *ng) Subscribe(changes chan interface{}, afterIdx uint64, cancelC chan struct{}) error { |
| 626 | watcher := etcd.NewWatcher(n.client) |
| 627 | defer watcher.Close() |
| 628 | |
| 629 | var watchChan etcd.WatchChan |
| 630 | ready := make(chan struct{}) |
| 631 | go func() { |
| 632 | watchChan = watcher.Watch(etcd.WithRequireLeader(n.context), n.etcdKey, |
| 633 | etcd.WithRev(int64(afterIdx)), etcd.WithPrefix()) |
| 634 | close(ready) |
| 635 | }() |
| 636 | select { |
| 637 | case <-ready: |
| 638 | log.Infof("begin watching: etcd revision %d", afterIdx) |
| 639 | case <-time.After(time.Second * 10): |
| 640 | return errors.New("timed out while waiting for watcher.Watch() to start") |
| 641 | } |
| 642 | |
| 643 | for response := range watchChan { |
| 644 | if response.Canceled { |
| 645 | log.Warn("etcd watcher cancelled") |
| 646 | |
| 647 | if err := response.Err(); err != nil { |
| 648 | log.Errorf("etcd watcher cancelled with error: %v", err) |
| 649 | return err |
| 650 | } |
| 651 | |
| 652 | return errors.New("etcd watcher failed without error message (canceled)") |
| 653 | } |
| 654 | |
| 655 | if err := response.Err(); err != nil { |
| 656 | log.Errorf("etcd watcher received error: %v", err) |
| 657 | return err |
| 658 | } |
| 659 | |
| 660 | for _, event := range response.Events { |
| 661 | log.WithFields(eventToFields(event)).Infof("%s: %s", event.Type, event.Kv.Key) |
| 662 | change, err := n.parseChange(event) |
| 663 | if err != nil { |
| 664 | log.WithFields(eventToFields(event)).Warningf("ignoring event; error: %s", err) |
| 665 | continue |
| 666 | } |
| 667 | if change != nil { |
| 668 | log.Infof("%v", change) |
| 669 | select { |
| 670 | case changes <- change: |
| 671 | case <-cancelC: |
| 672 | return nil |
| 673 | } |
| 674 | } |
| 675 | } |
| 676 | } |
| 677 | |
| 678 | return errors.New("etcd watcher channel closed without graceful stop") |
| 679 | } |
| 680 | |
| 681 | type MatcherFn func(*etcd.Event) (interface{}, error) |
| 682 |
nothing calls this directly
no test coverage detected