MCPcopy
hub / github.com/dgraph-io/dgraph / StreamMembership

Method StreamMembership

dgraph/cmd/zero/zero.go:834–864  ·  view source on GitHub ↗

StreamMembership periodically streams the membership state to the given stream.

(_ *api.Payload, stream pb.Zero_StreamMembershipServer)

Source from the content-addressed store, hash-verified

832
833// StreamMembership periodically streams the membership state to the given stream.
834func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
835 // Send MembershipState right away. So, the connection is correctly established.
836 ctx := stream.Context()
837 ms, err := s.latestMembershipState(ctx)
838 if err != nil {
839 return err
840 }
841 if err := stream.Send(ms); err != nil {
842 return err
843 }
844
845 ticker := time.Tick(time.Second)
846
847 for {
848 select {
849 case <-ticker:
850 // Send an update every second.
851 ms, err := s.latestMembershipState(ctx)
852 if err != nil {
853 return err
854 }
855 if err := stream.Send(ms); err != nil {
856 return err
857 }
858 case <-ctx.Done():
859 return ctx.Err()
860 case <-s.closer.HasBeenClosed():
861 return errServerShutDown
862 }
863 }
864}
865
866func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState, error) {
867 if err := s.Node.WaitLinearizableRead(ctx); err != nil {

Callers

nothing calls this directly

Calls 3

latestMembershipStateMethod · 0.95
SendMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected