loadSnapshots load snapshots for dimension tables
()
| 148 | |
| 149 | // loadSnapshots load snapshots for dimension tables |
| 150 | func (m *memStoreImpl) loadSnapshots() { |
| 151 | utils.GetLogger().Info("Start loading snapshots for all table shards") |
| 152 | var wg sync.WaitGroup |
| 153 | for table, tableSchema := range m.TableSchemas { |
| 154 | if tableSchema.Schema.IsFactTable { |
| 155 | continue |
| 156 | } |
| 157 | wg.Add(1) |
| 158 | go func(tableName string) { |
| 159 | tableShards := m.TableShards[tableName] |
| 160 | for _, shard := range tableShards { |
| 161 | utils.GetLogger().With( |
| 162 | "job", "snapshot_load", |
| 163 | "table", shard.Schema.Schema.Name, |
| 164 | "shard", shard.ShardID). |
| 165 | Info("Loading snapshots") |
| 166 | if err := shard.LoadSnapshot(); err != nil { |
| 167 | utils.GetLogger().With( |
| 168 | "job", "snapshot_load", |
| 169 | "table", shard.Schema.Schema.Name, |
| 170 | "shard", shard.ShardID).Panic(err) |
| 171 | } |
| 172 | utils.GetLogger().With( |
| 173 | "job", "snapshot_load", |
| 174 | "table", shard.Schema.Schema.Name, |
| 175 | "shard", shard.ShardID). |
| 176 | Info("Loading snapshots done") |
| 177 | } |
| 178 | wg.Done() |
| 179 | }(table) |
| 180 | |
| 181 | } |
| 182 | wg.Wait() |
| 183 | utils.GetLogger().Info("Finish loading snapshots for all table shards") |
| 184 | } |
| 185 | |
| 186 | // playRedoLogs replay redo logs for all tables in parallel, and then start the data ingestion |
| 187 | func (m *memStoreImpl) playRedoLogs() { |