MCPcopy
hub / github.com/pingcap/tidb / fetchLoopUnion

Method fetchLoopUnion

pkg/executor/index_merge_reader.go:1191–1296  ·  view source on GitHub ↗
(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
	workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{})

Source from the content-addressed store, hash-verified

1189}
1190
1191func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
1192 workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
1193 failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) {
1194 failpoint.Return()
1195 })
1196 memTracker := memory.NewTracker(w.indexMerge.ID(), -1)
1197 memTracker.AttachTo(w.indexMerge.memTracker)
1198 defer memTracker.Detach()
1199 defer close(workCh)
1200 failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)
1201
1202 var pushedLimit *plannercore.PushedDownLimit
1203 if w.indexMerge.pushedLimit != nil {
1204 pushedLimit = w.indexMerge.pushedLimit.Clone()
1205 }
1206 hMap := kv.NewHandleMap()
1207 for {
1208 var ok bool
1209 var task *indexMergeTableTask
1210 if pushedLimit != nil && pushedLimit.Count == 0 {
1211 return
1212 }
1213 select {
1214 case <-ctx.Done():
1215 return
1216 case <-finished:
1217 return
1218 case task, ok = <-fetchCh:
1219 if !ok {
1220 return
1221 }
1222 }
1223
1224 select {
1225 case err := <-task.doneCh:
1226 // If got error from partialIndexWorker/partialTableWorker, stop processing.
1227 if err != nil {
1228 syncErr(ctx, finished, resultCh, err)
1229 return
1230 }
1231 default:
1232 }
1233 start := time.Now()
1234 handles := task.handles
1235 fhs := make([]kv.Handle, 0, 8)
1236
1237 memTracker.Consume(int64(cap(task.handles) * 8))
1238 for _, h := range handles {
1239 if w.indexMerge.partitionTableMode {
1240 if _, ok := h.(kv.PartitionHandle); !ok {
1241 h = kv.NewPartitionHandle(task.partitionTable.GetPhysicalID(), h)
1242 }
1243 }
1244 if _, ok := hMap.Get(h); !ok {
1245 fhs = append(fhs, h)
1246 hMap.Set(h, true)
1247 }
1248 }

Callers 1

Calls 15

AttachToMethod · 0.95
DetachMethod · 0.95
ConsumeMethod · 0.95
GetMethod · 0.95
SetMethod · 0.95
NewTrackerFunction · 0.92
NewHandleMapFunction · 0.92
NewPartitionHandleFunction · 0.92
syncErrFunction · 0.85
pushedLimitCountingDownFunction · 0.85
IDMethod · 0.65
CloneMethod · 0.65

Tested by

no test coverage detected