MCPcopy
hub / github.com/dgraph-io/dgraph / MovePredicate

Method MovePredicate

worker/predicate_move.go:189–255  ·  view source on GitHub ↗
(ctx context.Context,
	in *pb.MovePredicatePayload)

Source from the content-addressed store, hash-verified

187}
188
189func (w *grpcWorker) MovePredicate(ctx context.Context,
190 in *pb.MovePredicatePayload) (*api.Payload, error) {
191 ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer("grpcWorker").Start(ctx, "MovePredicate")
192 defer span.End()
193
194 n := groups().Node
195 if !n.AmLeader() {
196 return &emptyPayload, errNotLeader
197 }
198 // Don't do a predicate move if the cluster is in draining mode.
199 if err := x.HealthCheck(); err != nil {
200 return &emptyPayload, errors.Wrap(err, "Move predicate request rejected")
201 }
202
203 if groups().groupId() != in.SourceGid {
204 return &emptyPayload,
205 errors.Errorf("Group id doesn't match, received request for %d, my gid: %d",
206 in.SourceGid, groups().groupId())
207 }
208 if len(in.Predicate) == 0 {
209 return &emptyPayload, errEmptyPredicate
210 }
211
212 //TODO: need to find possibly a better way to not move __vector_ predicates
213 if in.DestGid == 0 && !strings.Contains(in.Predicate, hnsw.VecKeyword) {
214 glog.Infof("Was instructed to delete tablet: %v", in.Predicate)
215 // Expected Checksum ensures that all the members of this group would block until they get
216 // the latest membership status where this predicate now belongs to another group. So they
217 // know that they are no longer serving this predicate, before they delete it from their
218 // state. Without this checksum, the members could end up deleting the predicate and then
219 // serve a request asking for that predicate, causing Jepsen failures.
220 p := &pb.Proposal{
221 CleanPredicate: in.Predicate,
222 ExpectedChecksum: in.ExpectedChecksum,
223 StartTs: in.TxnTs,
224 }
225 return &emptyPayload, groups().Node.proposeAndWait(ctx, p)
226 }
227
228 if strings.Contains(in.Predicate, hnsw.VecKeyword) {
229 return &emptyPayload, nil
230 }
231
232 if err := posting.Oracle().WaitForTs(ctx, in.TxnTs); err != nil {
233 return &emptyPayload, errors.Errorf("While waiting for txn ts: %d. Error: %v", in.TxnTs, err)
234 }
235
236 gid, err := groups().BelongsTo(in.Predicate)
237 switch {
238 case err != nil:
239 return &emptyPayload, err
240 case gid == 0:
241 return &emptyPayload, errNonExistentTablet
242 case gid != groups().groupId():
243 return &emptyPayload, errUnservedTablet
244 }
245
246 msg := fmt.Sprintf("Move predicate request: %+v", in)

Callers

nothing calls this directly

Calls 15

HealthCheckFunction · 0.92
OracleFunction · 0.92
groupsFunction · 0.85
movePredicateHelperFunction · 0.85
groupIdMethod · 0.80
InfofMethod · 0.80
WaitForTsMethod · 0.80
BelongsToMethod · 0.80
InfoMethod · 0.80
StartMethod · 0.65
AmLeaderMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected