( ctx context.Context, s storage.ExternalStorage, filterOutFn func(restoreCommitTs, restoreStartTs uint64) bool, executionFn func(ctx context.Context, filename string, restoreCommitTs, restoreStartTs, rewriteTs uint64, tableIds, dbIds []int64) error, )
| 162 | } |
| 163 | |
| 164 | func fastWalkLogRestoreTableIDsBlocklistFile( |
| 165 | ctx context.Context, |
| 166 | s storage.ExternalStorage, |
| 167 | filterOutFn func(restoreCommitTs, restoreStartTs uint64) bool, |
| 168 | executionFn func(ctx context.Context, filename string, restoreCommitTs, restoreStartTs, rewriteTs uint64, tableIds, dbIds []int64) error, |
| 169 | ) error { |
| 170 | filenames := make([]string, 0) |
| 171 | if err := s.WalkDir(ctx, &storage.WalkOption{SubDir: logRestoreTableIDBlocklistFilePrefix}, func(path string, _ int64) error { |
| 172 | restoreCommitTs, restoreStartTs, parsed := parseLogRestoreTableIDsBlocklistFileName(path) |
| 173 | if parsed { |
| 174 | if filterOutFn(restoreCommitTs, restoreStartTs) { |
| 175 | return nil |
| 176 | } |
| 177 | } |
| 178 | filenames = append(filenames, path) |
| 179 | return nil |
| 180 | }); err != nil { |
| 181 | return errors.Trace(err) |
| 182 | } |
| 183 | workerpool := tidbutil.NewWorkerPool(8, "walk dir log restore table IDs blocklist files") |
| 184 | eg, ectx := errgroup.WithContext(ctx) |
| 185 | for _, filename := range filenames { |
| 186 | if ectx.Err() != nil { |
| 187 | break |
| 188 | } |
| 189 | workerpool.ApplyOnErrorGroup(eg, func() error { |
| 190 | data, err := s.ReadFile(ectx, filename) |
| 191 | if err != nil { |
| 192 | return errors.Trace(err) |
| 193 | } |
| 194 | blocklistFile, err := unmarshalLogRestoreTableIDsBlocklistFile(data) |
| 195 | if err != nil { |
| 196 | return errors.Trace(err) |
| 197 | } |
| 198 | if filterOutFn(blocklistFile.RestoreCommitTs, blocklistFile.RestoreStartTs) { |
| 199 | return nil |
| 200 | } |
| 201 | err = executionFn(ectx, filename, blocklistFile.RestoreCommitTs, blocklistFile.RestoreStartTs, blocklistFile.RewriteTs, blocklistFile.TableIds, blocklistFile.DbIds) |
| 202 | return errors.Trace(err) |
| 203 | }) |
| 204 | } |
| 205 | return errors.Trace(eg.Wait()) |
| 206 | } |
| 207 | |
| 208 | // CheckTableTrackerContainsTableIDsFromBlocklistFiles checks whether pitr id tracker contains the filtered table IDs from blocklist file. |
| 209 | func CheckTableTrackerContainsTableIDsFromBlocklistFiles( |
no test coverage detected