RunRestore starts a restore task inside the current goroutine.
(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig)
| 869 | |
| 870 | // RunRestore starts a restore task inside the current goroutine. |
| 871 | func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) (restoreErr error) { |
| 872 | etcdCLI, err := dialEtcdWithCfg(c, cfg.Config) |
| 873 | if err != nil { |
| 874 | return err |
| 875 | } |
| 876 | defer func() { |
| 877 | if err := etcdCLI.Close(); err != nil { |
| 878 | log.Error("failed to close the etcd client", zap.Error(err)) |
| 879 | } |
| 880 | }() |
| 881 | logTaskBackend, err := checkConflictingLogBackup(c, cfg, IsStreamRestore(cmdName), etcdCLI) |
| 882 | if err != nil { |
| 883 | return errors.Annotate(err, "failed to check task exists") |
| 884 | } |
| 885 | closeF, err := registerTaskToPD(c, etcdCLI) |
| 886 | if err != nil { |
| 887 | return errors.Annotate(err, "failed to register task to pd") |
| 888 | } |
| 889 | defer func() { |
| 890 | _ = closeF(c) |
| 891 | }() |
| 892 | |
| 893 | config.UpdateGlobal(func(conf *config.Config) { |
| 894 | conf.KeyspaceName = cfg.KeyspaceName |
| 895 | }) |
| 896 | |
| 897 | // TODO: remove version checker from `NewMgr` |
| 898 | mgr, err := NewMgr(c, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker) |
| 899 | if err != nil { |
| 900 | return errors.Trace(err) |
| 901 | } |
| 902 | defer mgr.Close() |
| 903 | defer cfg.CloseCheckpointMetaManager() |
| 904 | defer func() { |
| 905 | if logTaskBackend == nil || restoreErr != nil || cfg.PiTRTableTracker == nil { |
| 906 | return |
| 907 | } |
| 908 | restoreCommitTs, err := restore.GetTSWithRetry(c, mgr.GetPDClient()) |
| 909 | if err != nil { |
| 910 | restoreErr = err |
| 911 | return |
| 912 | } |
| 913 | if cfg.tableMappingManager == nil { |
| 914 | log.Error("tableMappingManager is nil, blocklist will contain no IDs") |
| 915 | restoreErr = errors.New("tableMappingManager is nil") |
| 916 | return |
| 917 | } |
| 918 | |
| 919 | // Extract downstream IDs from tableMappingManager |
| 920 | // ApplyFilterToDBReplaceMap has already filtered the DBReplaceMap based on PiTRTableTracker, |
| 921 | // so we can directly iterate through it and collect non-filtered IDs |
| 922 | downstreamTableIds := make(map[int64]struct{}) |
| 923 | var downstreamDbIds []int64 |
| 924 | |
| 925 | // Iterate through DBReplaceMap which has already been filtered by ApplyFilterToDBReplaceMap |
| 926 | for _, dbReplace := range cfg.tableMappingManager.DBReplaceMap { |
| 927 | if dbReplace.FilteredOut { |
| 928 | continue |