CreateDatabases creates databases. If the client has the db pool, it would create it.
(ctx context.Context, dbs []*metautil.Database)
| 1006 | |
| 1007 | // CreateDatabases creates databases. If the client has the db pool, it would create it. |
| 1008 | func (rc *SnapClient) CreateDatabases(ctx context.Context, dbs []*metautil.Database) error { |
| 1009 | if rc.IsSkipCreateSQL() { |
| 1010 | log.Info("skip create database") |
| 1011 | return nil |
| 1012 | } |
| 1013 | |
| 1014 | if len(rc.dbPool) == 0 { |
| 1015 | log.Info("create databases sequentially") |
| 1016 | for _, db := range dbs { |
| 1017 | err := rc.db.CreateDatabase(ctx, db.Info, rc.supportPolicy, rc.policyMap) |
| 1018 | if err != nil { |
| 1019 | return errors.Trace(err) |
| 1020 | } |
| 1021 | } |
| 1022 | return nil |
| 1023 | } |
| 1024 | |
| 1025 | log.Info("create databases in db pool", zap.Int("pool size", len(rc.dbPool)), zap.Int("number of db", len(dbs))) |
| 1026 | eg, ectx := errgroup.WithContext(ctx) |
| 1027 | workers := tidbutil.NewWorkerPool(uint(len(rc.dbPool)), "DB DDL workers") |
| 1028 | for _, db_ := range dbs { |
| 1029 | db := db_ |
| 1030 | workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { |
| 1031 | conn := rc.dbPool[id%uint64(len(rc.dbPool))] |
| 1032 | return conn.CreateDatabase(ectx, db.Info, rc.supportPolicy, rc.policyMap) |
| 1033 | }) |
| 1034 | } |
| 1035 | return eg.Wait() |
| 1036 | } |
| 1037 | |
| 1038 | // generateRebasedTables generate a map[UniqueTableName]bool to represent tables that haven't updated table info. |
| 1039 | // there are two situations: |