UpdateMembership updates the membership of the given group.
(ctx context.Context, group *pb.Group)
| 730 | |
| 731 | // UpdateMembership updates the membership of the given group. |
| 732 | func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) { |
| 733 | // Only Zero leader would get these membership updates. |
| 734 | if ts := group.GetCheckpointTs(); ts > 0 { |
| 735 | for _, m := range group.GetMembers() { |
| 736 | s.Lock() |
| 737 | s.checkpointPerGroup[m.GetGroupId()] = ts |
| 738 | s.Unlock() |
| 739 | } |
| 740 | } |
| 741 | proposals, err := s.createProposals(group) |
| 742 | if err != nil { |
| 743 | // Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy |
| 744 | // wait. |
| 745 | time.Sleep(time.Second) |
| 746 | glog.Errorf("Error while creating proposals in Update: %v\n", err) |
| 747 | return nil, err |
| 748 | } |
| 749 | |
| 750 | ctx, cancel := context.WithCancel(ctx) |
| 751 | defer cancel() |
| 752 | |
| 753 | errCh := make(chan error, len(proposals)) |
| 754 | for _, pr := range proposals { |
| 755 | go func(pr *pb.ZeroProposal) { |
| 756 | errCh <- s.Node.proposeAndWait(ctx, pr) |
| 757 | }(pr) |
| 758 | } |
| 759 | |
| 760 | for range proposals { |
| 761 | // We Don't care about these errors |
| 762 | // Ideally shouldn't error out. |
| 763 | if err := <-errCh; err != nil { |
| 764 | glog.Errorf("Error while applying proposal in Update stream: %v\n", err) |
| 765 | return nil, err |
| 766 | } |
| 767 | } |
| 768 | |
| 769 | if len(group.Members) == 0 { |
| 770 | return &api.Payload{Data: []byte("OK")}, nil |
| 771 | } |
| 772 | select { |
| 773 | case s.moveOngoing <- struct{}{}: |
| 774 | default: |
| 775 | // If a move is going on, don't do the next steps of deleting predicates. |
| 776 | return &api.Payload{Data: []byte("OK")}, nil |
| 777 | } |
| 778 | defer func() { |
| 779 | <-s.moveOngoing |
| 780 | }() |
| 781 | |
| 782 | if err := s.deletePredicates(ctx, group); err != nil { |
| 783 | glog.Warningf("While deleting predicates: %v", err) |
| 784 | } |
| 785 | return &api.Payload{Data: []byte("OK")}, nil |
| 786 | } |
| 787 | |
| 788 | func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error { |
| 789 | if group == nil || group.Tablets == nil { |
nothing calls this directly
no test coverage detected