(ctx context.Context, group *pb.Group)
| 786 | } |
| 787 | |
| 788 | func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error { |
| 789 | if group == nil || group.Tablets == nil { |
| 790 | return nil |
| 791 | } |
| 792 | var gid uint32 |
| 793 | for _, tablet := range group.Tablets { |
| 794 | gid = tablet.GroupId |
| 795 | break |
| 796 | } |
| 797 | if gid == 0 { |
| 798 | return errors.Errorf("Unable to find group") |
| 799 | } |
| 800 | state, err := s.latestMembershipState(ctx) |
| 801 | if err != nil { |
| 802 | return err |
| 803 | } |
| 804 | sg, ok := state.Groups[gid] |
| 805 | if !ok { |
| 806 | return errors.Errorf("Unable to find group: %d", gid) |
| 807 | } |
| 808 | |
| 809 | pl := s.Leader(gid) |
| 810 | if pl == nil { |
| 811 | return errors.Errorf("Unable to reach leader of group: %d", gid) |
| 812 | } |
| 813 | wc := pb.NewWorkerClient(pl.Get()) |
| 814 | |
| 815 | for pred := range group.Tablets { |
| 816 | if _, found := sg.Tablets[pred]; found { |
| 817 | continue |
| 818 | } |
| 819 | glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.", |
| 820 | pred, gid) |
| 821 | in := &pb.MovePredicatePayload{ |
| 822 | Predicate: pred, |
| 823 | SourceGid: gid, |
| 824 | DestGid: 0, |
| 825 | } |
| 826 | if _, err := wc.MovePredicate(ctx, in); err != nil { |
| 827 | return err |
| 828 | } |
| 829 | } |
| 830 | return nil |
| 831 | } |
| 832 | |
| 833 | // StreamMembership periodically streams the membership state to the given stream. |
| 834 | func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error { |
no test coverage detected