MCPcopy
hub / github.com/dgraph-io/dgraph / movePredicate

Method movePredicate

dgraph/cmd/zero/tablet.go:115–218  ·  view source on GitHub ↗

movePredicate is the main entry point for move predicate logic. This Zero must remain the leader for the entire duration of predicate move. If this Zero stops being the leader, the final proposal of reassigning the tablet to the destination would fail automatically.

(predicate string, srcGroup, dstGroup uint32)

Source from the content-addressed store, hash-verified

113// for the entire duration of predicate move. If this Zero stops being the leader, the final
114// proposal of reassigning the tablet to the destination would fail automatically.
115func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
116 s.moveOngoing <- struct{}{}
117 defer func() {
118 <-s.moveOngoing
119 }()
120
121 ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout)
122 defer cancel()
123
124 span := trace.SpanFromContext(ctx)
125 defer span.End()
126
127 // Ensure that reserved predicates cannot be moved.
128 if x.IsReservedPredicate(predicate) {
129 return errors.Errorf("Unable to move reserved predicate %s", predicate)
130 }
131
132 // Ensure that I'm connected to the rest of the Zero group, and am the leader.
133 if _, err := s.latestMembershipState(ctx); err != nil {
134 return errors.Wrapf(err, "unable to reach quorum")
135 }
136 if !s.Node.AmLeader() {
137 return errors.Errorf("I am not the Zero leader")
138 }
139 tab := s.ServingTablet(predicate)
140 if tab == nil {
141 return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
142 }
143 msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
144 " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
145 humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
146 glog.Info(msg)
147 span.SetAttributes(attribute.String("tablet", predicate))
148 span.SetStatus(1, msg)
149
150 // Block all commits on this predicate. Keep them blocked until we return from this function.
151 unblock := s.blockTablet(predicate)
152 defer unblock()
153
154 // Get a new timestamp, beyond which we are sure that no new txns would be committed for this
155 // predicate. Source Alpha leader must reach this timestamp before streaming the data.
156 ids, err := s.Timestamps(ctx, &pb.Num{Val: 1})
157 if err != nil || ids.StartId == 0 {
158 return errors.Wrapf(err, "while leasing txn timestamp. Id: %+v", ids)
159 }
160
161 // Get connection to leader of source group.
162 pl := s.Leader(srcGroup)
163 if pl == nil {
164 return errors.Errorf("No healthy connection found to leader of group %d", srcGroup)
165 }
166 wc := pb.NewWorkerClient(pl.Get())
167 in := &pb.MovePredicatePayload{
168 Predicate: predicate,
169 SourceGid: srcGroup,
170 DestGid: dstGroup,
171 TxnTs: ids.StartId,
172 }

Callers 2

rebalanceTabletsMethod · 0.95
MoveTabletMethod · 0.95

Calls 15

latestMembershipStateMethod · 0.95
ServingTabletMethod · 0.95
blockTabletMethod · 0.95
TimestampsMethod · 0.95
LeaderMethod · 0.95
MovePredicateMethod · 0.95
groupChecksumsMethod · 0.95
IsReservedPredicateFunction · 0.92
NewWorkerClientFunction · 0.92
InfoMethod · 0.80
InfofMethod · 0.80
WarningfMethod · 0.80

Tested by

no test coverage detected