| 165 | } |
| 166 | |
| 167 | func (s *Store) Open(ctx context.Context) error { |
| 168 | if err := s.levels.Validate(); err != nil { |
| 169 | return err |
| 170 | } |
| 171 | |
| 172 | g, ctx := errgroup.WithContext(ctx) |
| 173 | g.SetLimit(50) |
| 174 | for _, db := range s.dbs { |
| 175 | db := db |
| 176 | g.Go(func() error { |
| 177 | select { |
| 178 | case <-ctx.Done(): |
| 179 | return ctx.Err() |
| 180 | default: |
| 181 | return db.Open() |
| 182 | } |
| 183 | }) |
| 184 | } |
| 185 | if err := g.Wait(); err != nil { |
| 186 | return err |
| 187 | } |
| 188 | |
| 189 | // Start monitors for compactions & snapshots. |
| 190 | if s.CompactionMonitorEnabled { |
| 191 | // Start compaction monitors for all levels except L0. |
| 192 | for _, lvl := range s.levels { |
| 193 | lvl := lvl |
| 194 | if lvl.Level == 0 { |
| 195 | continue |
| 196 | } |
| 197 | |
| 198 | s.wg.Add(1) |
| 199 | go func() { |
| 200 | defer s.wg.Done() |
| 201 | s.monitorCompactionLevel(s.ctx, lvl) |
| 202 | }() |
| 203 | } |
| 204 | |
| 205 | // Start snapshot monitor for snapshots. |
| 206 | s.wg.Add(1) |
| 207 | go func() { |
| 208 | defer s.wg.Done() |
| 209 | s.monitorCompactionLevel(s.ctx, s.SnapshotLevel()) |
| 210 | }() |
| 211 | } |
| 212 | |
| 213 | if s.L0Retention > 0 && s.L0RetentionCheckInterval > 0 { |
| 214 | s.wg.Add(1) |
| 215 | go func() { |
| 216 | defer s.wg.Done() |
| 217 | s.monitorL0Retention(s.ctx) |
| 218 | }() |
| 219 | } |
| 220 | |
| 221 | // Start heartbeat monitor if any database has heartbeat configured. |
| 222 | s.startHeartbeatMonitorIfNeeded() |
| 223 | |
| 224 | // Start validation monitor if configured. |