movePredicate is the main entry point for move predicate logic. This Zero must remain the leader for the entire duration of predicate move. If this Zero stops being the leader, the final proposal of reassigning the tablet to the destination would fail automatically.
(predicate string, srcGroup, dstGroup uint32)
| 113 | // for the entire duration of predicate move. If this Zero stops being the leader, the final |
| 114 | // proposal of reassigning the tablet to the destination would fail automatically. |
| 115 | func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error { |
| 116 | s.moveOngoing <- struct{}{} |
| 117 | defer func() { |
| 118 | <-s.moveOngoing |
| 119 | }() |
| 120 | |
| 121 | ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout) |
| 122 | defer cancel() |
| 123 | |
| 124 | span := trace.SpanFromContext(ctx) |
| 125 | defer span.End() |
| 126 | |
| 127 | // Ensure that reserved predicates cannot be moved. |
| 128 | if x.IsReservedPredicate(predicate) { |
| 129 | return errors.Errorf("Unable to move reserved predicate %s", predicate) |
| 130 | } |
| 131 | |
| 132 | // Ensure that I'm connected to the rest of the Zero group, and am the leader. |
| 133 | if _, err := s.latestMembershipState(ctx); err != nil { |
| 134 | return errors.Wrapf(err, "unable to reach quorum") |
| 135 | } |
| 136 | if !s.Node.AmLeader() { |
| 137 | return errors.Errorf("I am not the Zero leader") |
| 138 | } |
| 139 | tab := s.ServingTablet(predicate) |
| 140 | if tab == nil { |
| 141 | return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate) |
| 142 | } |
| 143 | msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+ |
| 144 | " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)), |
| 145 | humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup) |
| 146 | glog.Info(msg) |
| 147 | span.SetAttributes(attribute.String("tablet", predicate)) |
| 148 | span.SetStatus(1, msg) |
| 149 | |
| 150 | // Block all commits on this predicate. Keep them blocked until we return from this function. |
| 151 | unblock := s.blockTablet(predicate) |
| 152 | defer unblock() |
| 153 | |
| 154 | // Get a new timestamp, beyond which we are sure that no new txns would be committed for this |
| 155 | // predicate. Source Alpha leader must reach this timestamp before streaming the data. |
| 156 | ids, err := s.Timestamps(ctx, &pb.Num{Val: 1}) |
| 157 | if err != nil || ids.StartId == 0 { |
| 158 | return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids) |
| 159 | } |
| 160 | |
| 161 | // Get connection to leader of source group. |
| 162 | pl := s.Leader(srcGroup) |
| 163 | if pl == nil { |
| 164 | return errors.Errorf("No healthy connection found to leader of group %d", srcGroup) |
| 165 | } |
| 166 | wc := pb.NewWorkerClient(pl.Get()) |
| 167 | in := &pb.MovePredicatePayload{ |
| 168 | Predicate: predicate, |
| 169 | SourceGid: srcGroup, |
| 170 | DestGid: dstGroup, |
| 171 | TxnTs: ids.StartId, |
| 172 | } |
no test coverage detected