proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal to be applied(written to WAL) to all the nodes in the group.
(ctx context.Context, proposal *pb.Proposal)
| 123 | // proposeAndWait sends a proposal through RAFT. It waits on a channel for the proposal |
| 124 | // to be applied(written to WAL) to all the nodes in the group. |
| 125 | func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr error) { |
| 126 | startTime := time.Now() |
| 127 | ctx = x.WithMethod(ctx, "n.proposeAndWait") |
| 128 | defer func() { |
| 129 | v := x.TagValueStatusOK |
| 130 | if perr != nil { |
| 131 | v = x.TagValueStatusError |
| 132 | } |
| 133 | ctx, _ = tag.New(ctx, tag.Upsert(x.KeyStatus, v)) |
| 134 | timeMs := x.SinceMs(startTime) |
| 135 | ostats.Record(ctx, x.LatencyMs.M(timeMs)) |
| 136 | }() |
| 137 | |
| 138 | if n.Raft() == nil { |
| 139 | return errors.Errorf("Raft isn't initialized yet") |
| 140 | } |
| 141 | if ctx.Err() != nil { |
| 142 | return ctx.Err() |
| 143 | } |
| 144 | // Set this to disable retrying mechanism, and using the user-specified |
| 145 | // timeout. |
| 146 | var noTimeout bool |
| 147 | |
| 148 | checkTablet := func(pred string) error { |
| 149 | tablet, err := groups().Tablet(pred) |
| 150 | switch { |
| 151 | case err != nil: |
| 152 | return err |
| 153 | case tablet == nil || tablet.GroupId == 0: |
| 154 | return errNonExistentTablet |
| 155 | case tablet.GroupId != groups().groupId(): |
| 156 | return errUnservedTablet |
| 157 | default: |
| 158 | return nil |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | // Do a type check here if schema is present |
| 163 | // In very rare cases invalid entries might pass through raft, which would |
| 164 | // be persisted, we do best effort schema check while writing |
| 165 | ctx = schema.GetWriteContext(ctx) |
| 166 | if proposal.Mutations != nil { |
| 167 | for _, edge := range proposal.Mutations.Edges { |
| 168 | if err := checkTablet(edge.Attr); err != nil { |
| 169 | return err |
| 170 | } |
| 171 | su, ok := schema.State().Get(ctx, edge.Attr) |
| 172 | if !ok { |
| 173 | // We don't allow mutations for reserved predicates if the schema for them doesn't |
| 174 | // already exist. A predicate owned by a registered ReservedNamespace |
| 175 | // (see x.RegisterReservedNamespace) is an exception — its owner registers it via |
| 176 | // Alter as schemas are written. |
| 177 | if x.IsReservedPredicate(edge.Attr) && !x.IsRegisteredReservedPredicate(edge.Attr) { |
| 178 | return errors.Errorf("Can't store predicate `%s` as it is prefixed with "+ |
| 179 | "`dgraph.` which is reserved as the namespace for dgraph's internal "+ |
| 180 | "types/predicates.", |
| 181 | x.ParseAttr(edge.Attr)) |
| 182 | } |
no test coverage detected