(ranges []kv.KeyRange, fn func(handle kv.Handle, value []byte) error)
| 298 | } |
| 299 | |
| 300 | func (s *tableRegionSampler) scanFirstKVForEachRange(ranges []kv.KeyRange, |
| 301 | fn func(handle kv.Handle, value []byte) error) error { |
| 302 | ver := kv.Version{Ver: s.startTS} |
| 303 | snap := s.ctx.GetStore().GetSnapshot(ver) |
| 304 | setOptionForTopSQL(s.ctx.GetSessionVars().StmtCtx, snap) |
| 305 | concurrency := s.ctx.GetSessionVars().ExecutorConcurrency |
| 306 | if len(ranges) < concurrency { |
| 307 | concurrency = len(ranges) |
| 308 | } |
| 309 | |
| 310 | fetchers := make([]*sampleFetcher, concurrency) |
| 311 | for i := 0; i < concurrency; i++ { |
| 312 | fetchers[i] = &sampleFetcher{ |
| 313 | workerID: i, |
| 314 | concurrency: concurrency, |
| 315 | kvChan: make(chan *sampleKV), |
| 316 | snapshot: snap, |
| 317 | ranges: ranges, |
| 318 | } |
| 319 | go fetchers[i].run() |
| 320 | } |
| 321 | syncer := sampleSyncer{ |
| 322 | fetchers: fetchers, |
| 323 | totalCount: len(ranges), |
| 324 | consumeFn: fn, |
| 325 | } |
| 326 | return syncer.sync() |
| 327 | } |
| 328 | |
| 329 | func (s *tableRegionSampler) resetRowMap() { |
| 330 | if s.rowMap == nil { |
no test coverage detected