()
| 765 | } |
| 766 | |
| 767 | func (s *EtcdServer) run() { |
| 768 | lg := s.Logger() |
| 769 | |
| 770 | sn, err := s.r.raftStorage.Snapshot() |
| 771 | if err != nil { |
| 772 | lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) |
| 773 | } |
| 774 | |
| 775 | // asynchronously accept toApply packets, dispatch progress in-order |
| 776 | sched := schedule.NewFIFOScheduler(lg) |
| 777 | |
| 778 | rh := &raftReadyHandler{ |
| 779 | getLead: func() (lead uint64) { return s.getLead() }, |
| 780 | updateLead: func(lead uint64) { s.setLead(lead) }, |
| 781 | updateLeadership: func(newLeader bool) { |
| 782 | if !s.isLeader() { |
| 783 | if s.lessor != nil { |
| 784 | s.lessor.Demote() |
| 785 | } |
| 786 | if s.compactor != nil { |
| 787 | s.compactor.Pause() |
| 788 | } |
| 789 | } else { |
| 790 | if newLeader { |
| 791 | t := time.Now() |
| 792 | s.leadTimeMu.Lock() |
| 793 | s.leadElectedTime = t |
| 794 | s.leadTimeMu.Unlock() |
| 795 | } |
| 796 | if s.compactor != nil { |
| 797 | s.compactor.Resume() |
| 798 | } |
| 799 | } |
| 800 | if newLeader { |
| 801 | s.leaderChanged.Notify() |
| 802 | } |
| 803 | // TODO: remove the nil checking |
| 804 | // current test utility does not provide the stats |
| 805 | if s.stats != nil { |
| 806 | s.stats.BecomeLeader() |
| 807 | } |
| 808 | }, |
| 809 | updateCommittedIndex: func(ci uint64) { |
| 810 | cci := s.getCommittedIndex() |
| 811 | if ci > cci { |
| 812 | s.setCommittedIndex(ci) |
| 813 | } |
| 814 | }, |
| 815 | } |
| 816 | s.r.start(rh) |
| 817 | |
| 818 | ep := etcdProgress{ |
| 819 | confState: sn.Metadata.ConfState, |
| 820 | diskSnapshotIndex: sn.Metadata.Index, |
| 821 | memorySnapshotIndex: sn.Metadata.Index, |
| 822 | appliedt: sn.Metadata.Term, |
| 823 | appliedi: sn.Metadata.Index, |
| 824 | } |
no test coverage detected