MCPcopy
hub / github.com/etcd-io/etcd / run

Method run

server/etcdserver/server.go:767–867  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

765}
766
767func (s *EtcdServer) run() {
768 lg := s.Logger()
769
770 sn, err := s.r.raftStorage.Snapshot()
771 if err != nil {
772 lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
773 }
774
775 // asynchronously accept toApply packets, dispatch progress in-order
776 sched := schedule.NewFIFOScheduler(lg)
777
778 rh := &raftReadyHandler{
779 getLead: func() (lead uint64) { return s.getLead() },
780 updateLead: func(lead uint64) { s.setLead(lead) },
781 updateLeadership: func(newLeader bool) {
782 if !s.isLeader() {
783 if s.lessor != nil {
784 s.lessor.Demote()
785 }
786 if s.compactor != nil {
787 s.compactor.Pause()
788 }
789 } else {
790 if newLeader {
791 t := time.Now()
792 s.leadTimeMu.Lock()
793 s.leadElectedTime = t
794 s.leadTimeMu.Unlock()
795 }
796 if s.compactor != nil {
797 s.compactor.Resume()
798 }
799 }
800 if newLeader {
801 s.leaderChanged.Notify()
802 }
803 // TODO: remove the nil checking
804 // current test utility does not provide the stats
805 if s.stats != nil {
806 s.stats.BecomeLeader()
807 }
808 },
809 updateCommittedIndex: func(ci uint64) {
810 cci := s.getCommittedIndex()
811 if ci > cci {
812 s.setCommittedIndex(ci)
813 }
814 },
815 }
816 s.r.start(rh)
817
818 ep := etcdProgress{
819 confState: sn.Metadata.ConfState,
820 diskSnapshotIndex: sn.Metadata.Index,
821 memorySnapshotIndex: sn.Metadata.Index,
822 appliedt: sn.Metadata.Term,
823 appliedi: sn.Metadata.Index,
824 }

Callers 1

startMethod · 0.95

Calls 15

LoggerMethod · 0.95
getLeadMethod · 0.95
setLeadMethod · 0.95
isLeaderMethod · 0.95
getCommittedIndexMethod · 0.95
setCommittedIndexMethod · 0.95
CleanupMethod · 0.95
applyAllMethod · 0.95
revokeExpiredLeasesMethod · 0.95
PanicMethod · 0.80
NotifyMethod · 0.80
BecomeLeaderMethod · 0.80

Tested by

no test coverage detected