Snapshot is the process to write the current content of dimension table live store in memory to disk in order to 1.Facilitate recovery during server bootstrap. 2.Purge stale redo logs.
(table string, shardID int, reporter SnapshotJobDetailReporter)
| 23 | // 1.Facilitate recovery during server bootstrap. |
| 24 | // 2.Purge stale redo logs. |
| 25 | func (m *memStoreImpl) Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error { |
| 26 | snapshotTimer := utils.GetReporter(table, shardID).GetTimer(utils.SnapshotTimingTotal) |
| 27 | start := utils.Now() |
| 28 | jobKey := getIdentifier(table, shardID, memCom.SnapshotJobType) |
| 29 | |
| 30 | defer func() { |
| 31 | duration := utils.Now().Sub(start) |
| 32 | snapshotTimer.Record(duration) |
| 33 | reporter(jobKey, func(status *SnapshotJobDetail) { |
| 34 | status.LastDuration = duration |
| 35 | }) |
| 36 | utils.GetReporter(table, shardID). |
| 37 | GetCounter(utils.SnapshotCount).Inc(1) |
| 38 | }() |
| 39 | |
| 40 | shard, err := m.GetTableShard(table, shardID) |
| 41 | if err != nil { |
| 42 | utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?") |
| 43 | return nil |
| 44 | } |
| 45 | |
| 46 | defer shard.Users.Done() |
| 47 | utils.GetLogger().With( |
| 48 | "job", "snapshot", |
| 49 | "table", table).Infof("Creating snapshot") |
| 50 | |
| 51 | snapshotMgr := shard.LiveStore.SnapshotManager |
| 52 | // keep the current redofile and offset |
| 53 | redoFile, batchOffset, numMutations, lastReadRecord := snapshotMgr.StartSnapshot() |
| 54 | |
| 55 | reporter(jobKey, func(status *SnapshotJobDetail) { |
| 56 | status.RedologFile = redoFile |
| 57 | status.BatchOffset = batchOffset |
| 58 | status.Stage = SnapshotSnapshot |
| 59 | }) |
| 60 | |
| 61 | if numMutations > 0 { |
| 62 | if err = m.createSnapshot(shard, redoFile, batchOffset); err != nil { |
| 63 | return err |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // checkpoint snapshot progress |
| 68 | snapshotMgr.Done(redoFile, batchOffset, numMutations, lastReadRecord) |
| 69 | |
| 70 | reporter(jobKey, func(status *SnapshotJobDetail) { |
| 71 | status.Stage = SnapshotCleanup |
| 72 | }) |
| 73 | |
| 74 | shard.cleanOldSnapshotAndLogs(redoFile, batchOffset) |
| 75 | |
| 76 | reporter(jobKey, func(status *SnapshotJobDetail) { |
| 77 | status.Stage = SnapshotComplete |
| 78 | }) |
| 79 | |
| 80 | utils.GetLogger().With( |
| 81 | "job", "snapshot", |
| 82 | "table", table).Infof("Snapshot done") |
nothing calls this directly
no test coverage detected