| 802 | } |
| 803 | |
| 804 | func (n *node) checkQuorum(closer *z.Closer) { |
| 805 | defer closer.Done() |
| 806 | ticker := time.Tick(time.Second) |
| 807 | |
| 808 | quorum := func() { |
| 809 | // Make this timeout 1.5x the timeout on RunReadIndexLoop. |
| 810 | ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) |
| 811 | defer cancel() |
| 812 | |
| 813 | ctx, span := otel.Tracer("").Start(ctx, "Zero.checkQuorum") |
| 814 | defer span.End() |
| 815 | span.SetAttributes(attribute.String("Node id", fmt.Sprintf("%d", n.Id))) |
| 816 | |
| 817 | if state, err := n.server.latestMembershipState(ctx); err == nil { |
| 818 | n.mu.Lock() |
| 819 | n.lastQuorum = time.Now() |
| 820 | n.mu.Unlock() |
| 821 | // Also do some connection cleanup. |
| 822 | conn.GetPools().RemoveInvalid(state) |
| 823 | span.AddEvent("Updated lastQuorum") |
| 824 | |
| 825 | } else if glog.V(1) { |
| 826 | span.AddEvent(fmt.Sprintf("Got error: %v", err)) |
| 827 | glog.Warningf("Zero node: %#x unable to reach quorum. Error: %v", n.Id, err) |
| 828 | } |
| 829 | } |
| 830 | |
| 831 | for { |
| 832 | select { |
| 833 | case <-ticker: |
| 834 | // Only the leader needs to check for the quorum. The quorum is |
| 835 | // used by a leader to identify if it is behind a network partition. |
| 836 | if n.amLeader() { |
| 837 | quorum() |
| 838 | } |
| 839 | case <-closer.HasBeenClosed(): |
| 840 | return |
| 841 | } |
| 842 | } |
| 843 | } |
| 844 | |
| 845 | func (n *node) snapshotPeriodically(closer *z.Closer) { |
| 846 | defer closer.Done() |