| 962 | const tickDur = 100 * time.Millisecond |
| 963 | |
| 964 | func (n *node) Run() { |
| 965 | // lastLead is for detecting leadership changes |
| 966 | // |
| 967 | // etcd has a similar mechanism for tracking leader changes, with their |
| 968 | // raftReadyHandler.getLead() function that returns the previous leader |
| 969 | lastLead := uint64(math.MaxUint64) |
| 970 | |
| 971 | var leader bool |
| 972 | ticker := time.Tick(tickDur) |
| 973 | |
| 974 | // snapshot can cause select loop to block while deleting entries, so run |
| 975 | // it in goroutine |
| 976 | readStateCh := make(chan raft.ReadState, 100) |
| 977 | closer := z.NewCloser(0) |
| 978 | defer func() { |
| 979 | closer.SignalAndWait() |
| 980 | n.closer.Done() |
| 981 | glog.Infof("Zero Node.Run finished.") |
| 982 | }() |
| 983 | |
| 984 | closer.AddRunning(1) |
| 985 | go n.snapshotPeriodically(closer) |
| 986 | |
| 987 | closer.AddRunning(1) |
| 988 | go n.updateZeroMembershipPeriodically(closer) |
| 989 | |
| 990 | closer.AddRunning(1) |
| 991 | go n.checkQuorum(closer) |
| 992 | |
| 993 | closer.AddRunning(1) |
| 994 | go n.RunReadIndexLoop(closer, readStateCh) |
| 995 | |
| 996 | if !x.WorkerConfig.HardSync { |
| 997 | closer.AddRunning(1) |
| 998 | go x.StoreSync(n.Store, closer) |
| 999 | } |
| 1000 | // We only stop runReadIndexLoop after the for loop below has finished interacting with it. |
| 1001 | // That way we know sending to readStateCh will not deadlock. |
| 1002 | |
| 1003 | var timer x.Timer |
| 1004 | for { |
| 1005 | select { |
| 1006 | case <-n.closer.HasBeenClosed(): |
| 1007 | n.Raft().Stop() |
| 1008 | return |
| 1009 | case <-ticker: |
| 1010 | n.Raft().Tick() |
| 1011 | case rd := <-n.Raft().Ready(): |
| 1012 | timer.Start() |
| 1013 | _, span := otel.Tracer("").Start(n.ctx, "Zero.RunLoop") |
| 1014 | for _, rs := range rd.ReadStates { |
| 1015 | // No need to use select-case-default on pushing to readStateCh. It is typically |
| 1016 | // empty. |
| 1017 | readStateCh <- rs |
| 1018 | } |
| 1019 | span.AddEvent(fmt.Sprintf("Pushed %d readstates", len(rd.ReadStates))) |
| 1020 | |
| 1021 | if rd.SoftState != nil { |