(ctx context.Context, task *indexMergeTableTask)
| 1933 | } |
| 1934 | |
| 1935 | func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *indexMergeTableTask) error { |
| 1936 | tbl := w.indexMergeExec.table |
| 1937 | if w.indexMergeExec.partitionTableMode && task.partitionTable != nil { |
| 1938 | tbl = task.partitionTable |
| 1939 | } |
| 1940 | tableReader, err := w.indexMergeExec.buildFinalTableReader(ctx, tbl, task.handles) |
| 1941 | if err != nil { |
| 1942 | logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) |
| 1943 | return err |
| 1944 | } |
| 1945 | defer func() { terror.Log(exec.Close(tableReader)) }() |
| 1946 | task.memTracker = w.memTracker |
| 1947 | memUsage := int64(cap(task.handles) * 8) |
| 1948 | task.memUsage = memUsage |
| 1949 | task.memTracker.Consume(memUsage) |
| 1950 | handleCnt := len(task.handles) |
| 1951 | task.rows = make([]chunk.Row, 0, handleCnt) |
| 1952 | for { |
| 1953 | chk := exec.TryNewCacheChunk(tableReader) |
| 1954 | err = exec.Next(ctx, tableReader, chk) |
| 1955 | if err != nil { |
| 1956 | logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err)) |
| 1957 | return err |
| 1958 | } |
| 1959 | if chk.NumRows() == 0 { |
| 1960 | break |
| 1961 | } |
| 1962 | memUsage = chk.MemoryUsage() |
| 1963 | task.memUsage += memUsage |
| 1964 | task.memTracker.Consume(memUsage) |
| 1965 | iter := chunk.NewIterator4Chunk(chk) |
| 1966 | for row := iter.Begin(); row != iter.End(); row = iter.Next() { |
| 1967 | task.rows = append(task.rows, row) |
| 1968 | } |
| 1969 | } |
| 1970 | |
| 1971 | if w.indexMergeExec.keepOrder { |
| 1972 | // Because len(outputOffsets) == tableScan.Schema().Len(), |
| 1973 | // so we could use row.GetInt64(idx) to get partition ID here. |
| 1974 | // TODO: We could add plannercore.PartitionHandleCols to unify them. |
| 1975 | physicalTableIDIdx := -1 |
| 1976 | for i, c := range w.indexMergeExec.Schema().Columns { |
| 1977 | if c.ID == model.ExtraPhysTblID { |
| 1978 | physicalTableIDIdx = i |
| 1979 | break |
| 1980 | } |
| 1981 | } |
| 1982 | task.rowIdx = make([]int, 0, len(task.rows)) |
| 1983 | for _, row := range task.rows { |
| 1984 | handle, err := w.indexMergeExec.handleCols.BuildHandle(w.indexMergeExec.Ctx().GetSessionVars().StmtCtx, row) |
| 1985 | if err != nil { |
| 1986 | return err |
| 1987 | } |
| 1988 | if w.indexMergeExec.partitionTableMode && physicalTableIDIdx != -1 { |
| 1989 | handle = kv.NewPartitionHandle(row.GetInt64(physicalTableIDIdx), handle) |
| 1990 | } |
| 1991 | rowIdx, _ := task.indexOrder.Get(handle) |
| 1992 | task.rowIdx = append(task.rowIdx, rowIdx.(int)) |
no test coverage detected