| 271 | } |
| 272 | |
| 273 | func (d *dataNode) startTableAdditionWatch() { |
| 274 | shardOwnershipEvents, done, err := d.metaStore.WatchShardOwnershipEvents() |
| 275 | if err != nil { |
| 276 | utils.GetLogger().With("error", err.Error()).Fatal("failed to watch schema addition") |
| 277 | } |
| 278 | |
| 279 | for { |
| 280 | select { |
| 281 | case <-d.close: |
| 282 | return |
| 283 | case event, ok := <-shardOwnershipEvents: |
| 284 | if !ok { |
| 285 | close(done) |
| 286 | return |
| 287 | } |
| 288 | d.addTable(event.TableName) |
| 289 | done <- struct{}{} |
| 290 | } |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | func (d *dataNode) startActiveTopologyWatch() { |
| 295 | for { |