proposeAndWait makes a proposal to the quorum for Group Zero and waits for it to be accepted by the group before returning. It is safe to call concurrently.
(ctx context.Context, proposal *pb.ZeroProposal)
| 95 | // proposeAndWait makes a proposal to the quorum for Group Zero and waits for it to be accepted by |
| 96 | // the group before returning. It is safe to call concurrently. |
| 97 | func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) error { |
| 98 | switch { |
| 99 | case n.Raft() == nil: |
| 100 | return errors.Errorf("Raft isn't initialized yet.") |
| 101 | case ctx.Err() != nil: |
| 102 | return ctx.Err() |
| 103 | case !n.AmLeader(): |
| 104 | // Do this check upfront. Don't do this inside propose for reasons explained below. |
| 105 | return errors.Errorf("Not Zero leader. Aborting proposal: %+v", proposal) |
| 106 | } |
| 107 | |
| 108 | // We could consider adding a wrapper around the user proposal, so we can access any key-values. |
| 109 | // Something like this: |
| 110 | // https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4 |
| 111 | span := trace.SpanFromContext(ctx) |
| 112 | // Overwrite ctx, so we no longer enforce the timeouts or cancels from ctx. |
| 113 | ctx = trace.ContextWithSpan(context.Background(), span) |
| 114 | |
| 115 | stop := x.SpanTimer(span, "n.proposeAndWait") |
| 116 | defer stop() |
| 117 | |
| 118 | // propose runs in a loop. So, we should not do any checks inside, including n.AmLeader. This is |
| 119 | // to avoid the scenario where the first proposal times out and the second one gets returned |
| 120 | // due to node no longer being the leader. In this scenario, the first proposal can still get |
| 121 | // accepted by Raft, causing a txn violation later for us, because we assumed that the proposal |
| 122 | // did not go through. |
| 123 | propose := func(timeout time.Duration) error { |
| 124 | cctx, cancel := context.WithTimeout(ctx, timeout) |
| 125 | defer cancel() |
| 126 | |
| 127 | errCh := make(chan error, 1) |
| 128 | pctx := &conn.ProposalCtx{ |
| 129 | ErrCh: errCh, |
| 130 | // Don't use the original context, because that's not what we're passing to Raft. |
| 131 | Ctx: cctx, |
| 132 | } |
| 133 | key := n.uniqueKey() |
| 134 | // unique key is randomly generated key and could have collision. |
| 135 | // This is to ensure that even if collision occurs, we retry. |
| 136 | for !n.Proposals.Store(key, pctx) { |
| 137 | glog.Warningf("Found existing proposal with key: [%v]", key) |
| 138 | key = n.uniqueKey() |
| 139 | } |
| 140 | defer n.Proposals.Delete(key) |
| 141 | span.AddEvent("Proposing with key: %d. Timeout: %v", trace.WithAttributes( |
| 142 | attribute.Int64("key", int64(key)), |
| 143 | attribute.Int64("timeout", int64(timeout)))) |
| 144 | |
| 145 | sz := proto.Size(proposal) |
| 146 | data := make([]byte, 8+sz) |
| 147 | binary.BigEndian.PutUint64(data[:8], key) |
| 148 | _, err := x.MarshalToSizedBuffer(data[8:], proposal) |
| 149 | if err != nil { |
| 150 | return err |
| 151 | } |
| 152 | data = data[:8+sz] |
| 153 | // Propose the change. |
| 154 | if err := n.Raft().Propose(cctx, data); err != nil { |
no test coverage detected