MCPcopy
hub / github.com/pingcap/tidb / executeTask

Method executeTask

pkg/executor/index_merge_reader.go:1935–2004  ·  view source on GitHub ↗
(ctx context.Context, task *indexMergeTableTask)

Source from the content-addressed store, hash-verified

1933}
1934
1935func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *indexMergeTableTask) error {
1936 tbl := w.indexMergeExec.table
1937 if w.indexMergeExec.partitionTableMode && task.partitionTable != nil {
1938 tbl = task.partitionTable
1939 }
1940 tableReader, err := w.indexMergeExec.buildFinalTableReader(ctx, tbl, task.handles)
1941 if err != nil {
1942 logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
1943 return err
1944 }
1945 defer func() { terror.Log(exec.Close(tableReader)) }()
1946 task.memTracker = w.memTracker
1947 memUsage := int64(cap(task.handles) * 8)
1948 task.memUsage = memUsage
1949 task.memTracker.Consume(memUsage)
1950 handleCnt := len(task.handles)
1951 task.rows = make([]chunk.Row, 0, handleCnt)
1952 for {
1953 chk := exec.TryNewCacheChunk(tableReader)
1954 err = exec.Next(ctx, tableReader, chk)
1955 if err != nil {
1956 logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err))
1957 return err
1958 }
1959 if chk.NumRows() == 0 {
1960 break
1961 }
1962 memUsage = chk.MemoryUsage()
1963 task.memUsage += memUsage
1964 task.memTracker.Consume(memUsage)
1965 iter := chunk.NewIterator4Chunk(chk)
1966 for row := iter.Begin(); row != iter.End(); row = iter.Next() {
1967 task.rows = append(task.rows, row)
1968 }
1969 }
1970
1971 if w.indexMergeExec.keepOrder {
1972 // Because len(outputOffsets) == tableScan.Schema().Len(),
1973 // so we could use row.GetInt64(idx) to get partition ID here.
1974 // TODO: We could add plannercore.PartitionHandleCols to unify them.
1975 physicalTableIDIdx := -1
1976 for i, c := range w.indexMergeExec.Schema().Columns {
1977 if c.ID == model.ExtraPhysTblID {
1978 physicalTableIDIdx = i
1979 break
1980 }
1981 }
1982 task.rowIdx = make([]int, 0, len(task.rows))
1983 for _, row := range task.rows {
1984 handle, err := w.indexMergeExec.handleCols.BuildHandle(w.indexMergeExec.Ctx().GetSessionVars().StmtCtx, row)
1985 if err != nil {
1986 return err
1987 }
1988 if w.indexMergeExec.partitionTableMode && physicalTableIDIdx != -1 {
1989 handle = kv.NewPartitionHandle(row.GetInt64(physicalTableIDIdx), handle)
1990 }
1991 rowIdx, _ := task.indexOrder.Get(handle)
1992 task.rowIdx = append(task.rowIdx, rowIdx.(int))

Callers 1

pickAndExecTaskMethod · 0.95

Calls 15

BeginMethod · 0.95
EndMethod · 0.95
NextMethod · 0.95
LoggerFunction · 0.92
LogFunction · 0.92
CloseFunction · 0.92
TryNewCacheChunkFunction · 0.92
NextFunction · 0.92
NewIterator4ChunkFunction · 0.92
NewPartitionHandleFunction · 0.92
buildFinalTableReaderMethod · 0.80
WarnMethod · 0.80

Tested by

no test coverage detected