MCPcopy
hub / github.com/dgraph-io/dgraph / UpdateMembership

Method UpdateMembership

dgraph/cmd/zero/zero.go:732–786  ·  view source on GitHub ↗

UpdateMembership updates the membership of the given group.

(ctx context.Context, group *pb.Group)

Source from the content-addressed store, hash-verified

730
731// UpdateMembership updates the membership of the given group.
732func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
733 // Only Zero leader would get these membership updates.
734 if ts := group.GetCheckpointTs(); ts > 0 {
735 for _, m := range group.GetMembers() {
736 s.Lock()
737 s.checkpointPerGroup[m.GetGroupId()] = ts
738 s.Unlock()
739 }
740 }
741 proposals, err := s.createProposals(group)
742 if err != nil {
743 // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
744 // wait.
745 time.Sleep(time.Second)
746 glog.Errorf("Error while creating proposals in Update: %v\n", err)
747 return nil, err
748 }
749
750 ctx, cancel := context.WithCancel(ctx)
751 defer cancel()
752
753 errCh := make(chan error, len(proposals))
754 for _, pr := range proposals {
755 go func(pr *pb.ZeroProposal) {
756 errCh <- s.Node.proposeAndWait(ctx, pr)
757 }(pr)
758 }
759
760 for range proposals {
761 // We Don't care about these errors
762 // Ideally shouldn't error out.
763 if err := <-errCh; err != nil {
764 glog.Errorf("Error while applying proposal in Update stream: %v\n", err)
765 return nil, err
766 }
767 }
768
769 if len(group.Members) == 0 {
770 return &api.Payload{Data: []byte("OK")}, nil
771 }
772 select {
773 case s.moveOngoing <- struct{}{}:
774 default:
775 // If a move is going on, don't do the next steps of deleting predicates.
776 return &api.Payload{Data: []byte("OK")}, nil
777 }
778 defer func() {
779 <-s.moveOngoing
780 }()
781
782 if err := s.deletePredicates(ctx, group); err != nil {
783 glog.Warningf("While deleting predicates: %v", err)
784 }
785 return &api.Payload{Data: []byte("OK")}, nil
786}
787
788func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
789 if group == nil || group.Tablets == nil {

Callers

nothing calls this directly

Calls 10

createProposalsMethod · 0.95
deletePredicatesMethod · 0.95
GetMembersMethod · 0.80
WarningfMethod · 0.80
GetCheckpointTsMethod · 0.45
LockMethod · 0.45
GetGroupIdMethod · 0.45
UnlockMethod · 0.45
ErrorfMethod · 0.45
proposeAndWaitMethod · 0.45

Tested by

no test coverage detected