MCPcopy
hub / github.com/dgraph-io/dgraph / proposeAndWait

Method proposeAndWait

worker/proposal.go:125–312  ·  view source on GitHub ↗

proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal to be applied(written to WAL) to all the nodes in the group.

(ctx context.Context, proposal *pb.Proposal)

Source from the content-addressed store, hash-verified

123// proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal
124// to be applied(written to WAL) to all the nodes in the group.
125func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr error) {
126 startTime := time.Now()
127 ctx = x.WithMethod(ctx, "n.proposeAndWait")
128 defer func() {
129 v := x.TagValueStatusOK
130 if perr != nil {
131 v = x.TagValueStatusError
132 }
133 ctx, _ = tag.New(ctx, tag.Upsert(x.KeyStatus, v))
134 timeMs := x.SinceMs(startTime)
135 ostats.Record(ctx, x.LatencyMs.M(timeMs))
136 }()
137
138 if n.Raft() == nil {
139 return errors.Errorf("Raft isn't initialized yet")
140 }
141 if ctx.Err() != nil {
142 return ctx.Err()
143 }
144 // Set this to disable retrying mechanism, and using the user-specified
145 // timeout.
146 var noTimeout bool
147
148 checkTablet := func(pred string) error {
149 tablet, err := groups().Tablet(pred)
150 switch {
151 case err != nil:
152 return err
153 case tablet == nil || tablet.GroupId == 0:
154 return errNonExistentTablet
155 case tablet.GroupId != groups().groupId():
156 return errUnservedTablet
157 default:
158 return nil
159 }
160 }
161
162 // Do a type check here if schema is present
163 // In very rare cases invalid entries might pass through raft, which would
164 // be persisted, we do best effort schema check while writing
165 ctx = schema.GetWriteContext(ctx)
166 if proposal.Mutations != nil {
167 for _, edge := range proposal.Mutations.Edges {
168 if err := checkTablet(edge.Attr); err != nil {
169 return err
170 }
171 su, ok := schema.State().Get(ctx, edge.Attr)
172 if !ok {
173 // We don't allow mutations for reserved predicates if the schema for them doesn't
174 // already exist. A predicate owned by a registered ReservedNamespace
175 // (see x.RegisterReservedNamespace) is an exception — its owner registers it via
176 // Alter as schemas are written.
177 if x.IsReservedPredicate(edge.Attr) && !x.IsRegisteredReservedPredicate(edge.Attr) {
178 return errors.Errorf("Can't store predicate `%s` as it is prefixed with "+
179 "`dgraph.` which is reserved as the namespace for dgraph's internal "+
180 "types/predicates.",
181 x.ParseAttr(edge.Attr))
182 }

Callers 7

blockingAbortMethod · 0.95
DeleteNamespaceMethod · 0.45
RestoreMethod · 0.45
batchAndProposeKeyValuesFunction · 0.45
MovePredicateMethod · 0.45

Calls 15

WithMethodFunction · 0.92
SinceMsFunction · 0.92
GetWriteContextFunction · 0.92
StateFunction · 0.92
IsReservedPredicateFunction · 0.92
ParseAttrFunction · 0.92
MarshalToSizedBufferFunction · 0.92
SpanTimerFunction · 0.92
AssertTruefFunction · 0.92
groupsFunction · 0.85
ValidateAndConvertFunction · 0.85

Tested by

no test coverage detected