StreamMembership periodically streams the membership state to the given stream.
(_ *api.Payload, stream pb.Zero_StreamMembershipServer)
| 832 | |
| 833 | // StreamMembership periodically streams the membership state to the given stream. |
| 834 | func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error { |
| 835 | // Send MembershipState right away. So, the connection is correctly established. |
| 836 | ctx := stream.Context() |
| 837 | ms, err := s.latestMembershipState(ctx) |
| 838 | if err != nil { |
| 839 | return err |
| 840 | } |
| 841 | if err := stream.Send(ms); err != nil { |
| 842 | return err |
| 843 | } |
| 844 | |
| 845 | ticker := time.Tick(time.Second) |
| 846 | |
| 847 | for { |
| 848 | select { |
| 849 | case <-ticker: |
| 850 | // Send an update every second. |
| 851 | ms, err := s.latestMembershipState(ctx) |
| 852 | if err != nil { |
| 853 | return err |
| 854 | } |
| 855 | if err := stream.Send(ms); err != nil { |
| 856 | return err |
| 857 | } |
| 858 | case <-ctx.Done(): |
| 859 | return ctx.Err() |
| 860 | case <-s.closer.HasBeenClosed(): |
| 861 | return errServerShutDown |
| 862 | } |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) { |
| 867 | if err := s.Node.WaitLinearizableRead(ctx); err != nil { |
nothing calls this directly
no test coverage detected