(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{})
| 1189 | } |
| 1190 | |
| 1191 | func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, |
| 1192 | workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { |
| 1193 | failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { |
| 1194 | failpoint.Return() |
| 1195 | }) |
| 1196 | memTracker := memory.NewTracker(w.indexMerge.ID(), -1) |
| 1197 | memTracker.AttachTo(w.indexMerge.memTracker) |
| 1198 | defer memTracker.Detach() |
| 1199 | defer close(workCh) |
| 1200 | failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) |
| 1201 | |
| 1202 | var pushedLimit *plannercore.PushedDownLimit |
| 1203 | if w.indexMerge.pushedLimit != nil { |
| 1204 | pushedLimit = w.indexMerge.pushedLimit.Clone() |
| 1205 | } |
| 1206 | hMap := kv.NewHandleMap() |
| 1207 | for { |
| 1208 | var ok bool |
| 1209 | var task *indexMergeTableTask |
| 1210 | if pushedLimit != nil && pushedLimit.Count == 0 { |
| 1211 | return |
| 1212 | } |
| 1213 | select { |
| 1214 | case <-ctx.Done(): |
| 1215 | return |
| 1216 | case <-finished: |
| 1217 | return |
| 1218 | case task, ok = <-fetchCh: |
| 1219 | if !ok { |
| 1220 | return |
| 1221 | } |
| 1222 | } |
| 1223 | |
| 1224 | select { |
| 1225 | case err := <-task.doneCh: |
| 1226 | // If got error from partialIndexWorker/partialTableWorker, stop processing. |
| 1227 | if err != nil { |
| 1228 | syncErr(ctx, finished, resultCh, err) |
| 1229 | return |
| 1230 | } |
| 1231 | default: |
| 1232 | } |
| 1233 | start := time.Now() |
| 1234 | handles := task.handles |
| 1235 | fhs := make([]kv.Handle, 0, 8) |
| 1236 | |
| 1237 | memTracker.Consume(int64(cap(task.handles) * 8)) |
| 1238 | for _, h := range handles { |
| 1239 | if w.indexMerge.partitionTableMode { |
| 1240 | if _, ok := h.(kv.PartitionHandle); !ok { |
| 1241 | h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h) |
| 1242 | } |
| 1243 | } |
| 1244 | if _, ok := hMap.Get(h); !ok { |
| 1245 | fhs = append(fhs, h) |
| 1246 | hMap.Set(h, true) |
| 1247 | } |
| 1248 | } |
no test coverage detected