(ctx context.Context, in *pb.MovePredicatePayload)
| 187 | } |
| 188 | |
| 189 | func (w *grpcWorker) MovePredicate(ctx context.Context, |
| 190 | in *pb.MovePredicatePayload) (*api.Payload, error) { |
| 191 | ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer("grpcWorker").Start(ctx, "MovePredicate") |
| 192 | defer span.End() |
| 193 | |
| 194 | n := groups().Node |
| 195 | if !n.AmLeader() { |
| 196 | return &emptyPayload, errNotLeader |
| 197 | } |
| 198 | // Don't do a predicate move if the cluster is in draining mode. |
| 199 | if err := x.HealthCheck(); err != nil { |
| 200 | return &emptyPayload, errors.Wrap(err, "Move predicate request rejected") |
| 201 | } |
| 202 | |
| 203 | if groups().groupId() != in.SourceGid { |
| 204 | return &emptyPayload, |
| 205 | errors.Errorf("Group id doesn't match, received request for %d, my gid: %d", |
| 206 | in.SourceGid, groups().groupId()) |
| 207 | } |
| 208 | if len(in.Predicate) == 0 { |
| 209 | return &emptyPayload, errEmptyPredicate |
| 210 | } |
| 211 | |
| 212 | //TODO: need to find possibly a better way to not move __vector_ predicates |
| 213 | if in.DestGid == 0 && !strings.Contains(in.Predicate, hnsw.VecKeyword) { |
| 214 | glog.Infof("Was instructed to delete tablet: %v", in.Predicate) |
| 215 | // Expected Checksum ensures that all the members of this group would block until they get |
| 216 | // the latest membership status where this predicate now belongs to another group. So they |
| 217 | // know that they are no longer serving this predicate, before they delete it from their |
| 218 | // state. Without this checksum, the members could end up deleting the predicate and then |
| 219 | // serve a request asking for that predicate, causing Jepsen failures. |
| 220 | p := &pb.Proposal{ |
| 221 | CleanPredicate: in.Predicate, |
| 222 | ExpectedChecksum: in.ExpectedChecksum, |
| 223 | StartTs: in.TxnTs, |
| 224 | } |
| 225 | return &emptyPayload, groups().Node.proposeAndWait(ctx, p) |
| 226 | } |
| 227 | |
| 228 | if strings.Contains(in.Predicate, hnsw.VecKeyword) { |
| 229 | return &emptyPayload, nil |
| 230 | } |
| 231 | |
| 232 | if err := posting.Oracle().WaitForTs(ctx, in.TxnTs); err != nil { |
| 233 | return &emptyPayload, errors.Errorf("While waiting for txn ts: %d. Error: %v", in.TxnTs, err) |
| 234 | } |
| 235 | |
| 236 | gid, err := groups().BelongsTo(in.Predicate) |
| 237 | switch { |
| 238 | case err != nil: |
| 239 | return &emptyPayload, err |
| 240 | case gid == 0: |
| 241 | return &emptyPayload, errNonExistentTablet |
| 242 | case gid != groups().groupId(): |
| 243 | return &emptyPayload, errUnservedTablet |
| 244 | } |
| 245 | |
| 246 | msg := fmt.Sprintf("Move predicate request: %+v", in) |
nothing calls this directly
no test coverage detected