MCPcopy
hub / github.com/dgraph-io/dgraph / proposeAndWait

Method proposeAndWait

dgraph/cmd/zero/raft.go:97–185  ·  view source on GitHub ↗

proposeAndWait makes a proposal to the quorum for Group Zero and waits for it to be accepted by the group before returning. It is safe to call concurrently.

(ctx context.Context, proposal *pb.ZeroProposal)

Source from the content-addressed store, hash-verified

95// proposeAndWait makes a proposal to the quorum for Group Zero and waits for it to be accepted by
96// the group before returning. It is safe to call concurrently.
97func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) error {
98 switch {
99 case n.Raft() == nil:
100 return errors.Errorf("Raft isn't initialized yet.")
101 case ctx.Err() != nil:
102 return ctx.Err()
103 case !n.AmLeader():
104 // Do this check upfront. Don't do this inside propose for reasons explained below.
105 return errors.Errorf("Not Zero leader. Aborting proposal: %+v", proposal)
106 }
107
108 // We could consider adding a wrapper around the user proposal, so we can access any key-values.
109 // Something like this:
110 // https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4
111 span := trace.SpanFromContext(ctx)
112 // Overwrite ctx, so we no longer enforce the timeouts or cancels from ctx.
113 ctx = trace.ContextWithSpan(context.Background(), span)
114
115 stop := x.SpanTimer(span, "n.proposeAndWait")
116 defer stop()
117
118 // propose runs in a loop. So, we should not do any checks inside, including n.AmLeader. This is
119 // to avoid the scenario where the first proposal times out and the second one gets returned
120 // due to node no longer being the leader. In this scenario, the first proposal can still get
121 // accepted by Raft, causing a txn violation later for us, because we assumed that the proposal
122 // did not go through.
123 propose := func(timeout time.Duration) error {
124 cctx, cancel := context.WithTimeout(ctx, timeout)
125 defer cancel()
126
127 errCh := make(chan error, 1)
128 pctx := &conn.ProposalCtx{
129 ErrCh: errCh,
130 // Don't use the original context, because that's not what we're passing to Raft.
131 Ctx: cctx,
132 }
133 key := n.uniqueKey()
134 // unique key is randomly generated key and could have collision.
135 // This is to ensure that even if collision occurs, we retry.
136 for !n.Proposals.Store(key, pctx) {
137 glog.Warningf("Found existing proposal with key: [%v]", key)
138 key = n.uniqueKey()
139 }
140 defer n.Proposals.Delete(key)
141 span.AddEvent("Proposing with key: %d. Timeout: %v", trace.WithAttributes(
142 attribute.Int64("key", int64(key)),
143 attribute.Int64("timeout", int64(timeout))))
144
145 sz := proto.Size(proposal)
146 data := make([]byte, 8+sz)
147 binary.BigEndian.PutUint64(data[:8], key)
148 _, err := x.MarshalToSizedBuffer(data[8:], proposal)
149 if err != nil {
150 return err
151 }
152 data = data[:8+sz]
153 // Propose the change.
154 if err := n.Raft().Propose(cctx, data); err != nil {

Callers 11

proposeNewCIDMethod · 0.95
InformMethod · 0.45
RemoveNodeMethod · 0.45
ConnectMethod · 0.45
DeleteNamespaceMethod · 0.45
ShouldServeMethod · 0.45
UpdateMembershipMethod · 0.45
leaseMethod · 0.45
proposeTxnMethod · 0.45
movePredicateMethod · 0.45

Calls 13

AmLeaderMethod · 0.95
uniqueKeyMethod · 0.95
SpanTimerFunction · 0.92
MarshalToSizedBufferFunction · 0.92
RaftMethod · 0.80
WarningfMethod · 0.80
ErrorfMethod · 0.45
StoreMethod · 0.45
DeleteMethod · 0.45
SizeMethod · 0.45
StringMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected