()
| 1120 | } |
| 1121 | |
| 1122 | func (n *node) proposeSnapshot() error { |
| 1123 | lastIdx := x.Min(n.Applied.DoneUntil(), n.cdcTracker.getSeenIndex()) |
| 1124 | // We can't rely upon the Raft entries to determine the minPendingStart, |
| 1125 | // because there are many cases during mutations where we don't commit or |
| 1126 | // abort the transaction. This might happen due to an early error thrown. |
| 1127 | // Only the mutations which make it to Zero for a commit/abort decision have |
| 1128 | // corresponding Delta entries. So, instead of replicating all that logic |
| 1129 | // here, we just use the MinPendingStartTs tracked by the Oracle, and look |
| 1130 | // for that in the logs. |
| 1131 | // |
| 1132 | // So, we iterate over logs. If we hit MinPendingStartTs, that generates our |
| 1133 | // snapshotIdx. In any case, we continue picking up txn updates, to generate |
| 1134 | // a maxCommitTs, which would become the readTs for the snapshot. |
| 1135 | minPendingStart := x.Min(posting.Oracle().MinPendingStartTs(), n.cdcTracker.getTs()) |
| 1136 | snap, err := n.calculateSnapshot(0, lastIdx, minPendingStart) |
| 1137 | if err != nil { |
| 1138 | return err |
| 1139 | } |
| 1140 | if snap == nil { |
| 1141 | return nil |
| 1142 | } |
| 1143 | proposal := &pb.Proposal{ |
| 1144 | Snapshot: snap, |
| 1145 | } |
| 1146 | glog.V(2).Infof("Proposing snapshot: %+v\n", snap) |
| 1147 | sz := proto.Size(proposal) |
| 1148 | data := make([]byte, 8+sz) |
| 1149 | x.Check2(x.MarshalToSizedBuffer(data[8:], proposal)) |
| 1150 | data = data[:8+sz] |
| 1151 | return n.Raft().Propose(n.ctx, data) |
| 1152 | } |
| 1153 | |
| 1154 | const ( |
| 1155 | maxPendingSize int64 = 256 << 20 // in bytes. |
no test coverage detected