(done chan struct{})
| 1193 | } |
| 1194 | |
| 1195 | func (n *node) checkpointAndClose(done chan struct{}) { |
| 1196 | slowTicker := time.Tick(time.Minute) |
| 1197 | lastSnapshotTime := time.Now() |
| 1198 | |
| 1199 | snapshotAfterEntries := x.WorkerConfig.Raft.GetUint64("snapshot-after-entries") |
| 1200 | x.AssertTruef(snapshotAfterEntries > 10, "raft.snapshot-after must be a number greater than 10") |
| 1201 | |
| 1202 | snapshotFrequency := x.WorkerConfig.Raft.GetDuration("snapshot-after-duration") |
| 1203 | |
| 1204 | for { |
| 1205 | select { |
| 1206 | case <-slowTicker: |
| 1207 | // Do these operations asynchronously away from the main Run loop to allow heartbeats to |
| 1208 | // be sent on time. Otherwise, followers would just keep running elections. |
| 1209 | |
| 1210 | glog.V(3).Infof("Size of applyCh: %d", len(n.applyCh)) |
| 1211 | if err := n.updateRaftProgress(); err != nil { |
| 1212 | glog.Errorf("While updating Raft progress: %v", err) |
| 1213 | } |
| 1214 | |
| 1215 | if n.AmLeader() { |
| 1216 | // If leader doesn't have a snapshot, we should create one immediately. This is very |
| 1217 | // useful when you bring up the cluster from bulk loader. If you remove an alpha and |
| 1218 | // add a new alpha, the new follower won't get a snapshot if the leader doesn't have |
| 1219 | // one. |
| 1220 | snap, err := n.Store.Snapshot() |
| 1221 | if err != nil { |
| 1222 | glog.Errorf("While retrieving snapshot from Store: %v\n", err) |
| 1223 | continue |
| 1224 | } |
| 1225 | |
| 1226 | // If we don't have a snapshot, or if there are too many log files in Raft, |
| 1227 | // calculate a new snapshot. |
| 1228 | calculate := raft.IsEmptySnap(snap) || n.Store.NumLogFiles() > 4 |
| 1229 | |
| 1230 | // Only take snapshot if both snapshotFrequency and |
| 1231 | // snapshotAfterEntries requirements are met. If set to 0, |
| 1232 | // we consider duration condition to be disabled. |
| 1233 | if snapshotFrequency == 0 || time.Since(lastSnapshotTime) > snapshotFrequency { |
| 1234 | if chk, err := n.Store.Checkpoint(); err == nil { |
| 1235 | if first, err := n.Store.FirstIndex(); err == nil { |
| 1236 | // Save some cycles by only calculating snapshot if the checkpoint |
| 1237 | // has gone quite a bit further than the first index. |
| 1238 | calculate = calculate || chk >= first+snapshotAfterEntries |
| 1239 | glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+ |
| 1240 | "snapshotAfterEntries:%d snap:%v", first, chk, chk-first, |
| 1241 | snapshotAfterEntries, calculate) |
| 1242 | } |
| 1243 | } |
| 1244 | } |
| 1245 | |
| 1246 | // We keep track of the applied index in the p directory. Even if we don't take |
| 1247 | // snapshot for a while and let the Raft logs grow and restart, we would not have to |
| 1248 | // run all the log entries, because we can tell Raft.Config to set Applied to that |
| 1249 | // index. |
| 1250 | // This applied index tracking also covers the case when we have a big index |
| 1251 | // rebuild. The rebuild would be tracked just like others and would not need to be |
| 1252 | // replayed after a restart, because the Applied config would let us skip right |
no test coverage detected