MCPcopy
hub / github.com/pingcap/tidb / executeTask

Method executeTask

pkg/executor/distsql.go:1861–1973  ·  view source on GitHub ↗

executeTask executes the table look up tasks. We will construct a table reader and send request by handles. Then we hold the returning rows and finish this task.

(ctx context.Context, task *lookupTableTask)

Source from the content-addressed store, hash-verified

1859// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
1860// Then we hold the returning rows and finish this task.
1861func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
1862 tableReader, err := w.idxLookup.buildTableReader(ctx, task)
1863 task.buildDoneTime = time.Now()
1864 if err != nil {
1865 if ctx.Err() != context.Canceled {
1866 logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
1867 }
1868 return err
1869 }
1870 defer func() { terror.Log(exec.Close(tableReader)) }()
1871
1872 if w.checkIndexValue != nil {
1873 return w.compareData(ctx, task, tableReader)
1874 }
1875
1876 {
1877 task.memTracker = w.memTracker
1878 memUsage := int64(cap(task.handles))*size.SizeOfInterface + tableReader.memUsage()
1879 for _, h := range task.handles {
1880 memUsage += int64(h.MemUsage())
1881 }
1882 if task.indexOrder != nil {
1883 memUsage += task.indexOrder.MemUsage()
1884 }
1885 if task.duplicatedIndexOrder != nil {
1886 memUsage += task.duplicatedIndexOrder.MemUsage()
1887 }
1888 memUsage += task.idxRows.MemoryUsage()
1889 task.memUsage = memUsage
1890 task.memTracker.Consume(memUsage)
1891 }
1892 handleCnt := len(task.handles)
1893 task.rows = make([]chunk.Row, 0, handleCnt)
1894 for {
1895 chk := exec.TryNewCacheChunk(tableReader)
1896 err = exec.Next(ctx, tableReader, chk)
1897 if err != nil {
1898 if ctx.Err() != context.Canceled {
1899 logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err))
1900 }
1901 return err
1902 }
1903 if chk.NumRows() == 0 {
1904 break
1905 }
1906 {
1907 memUsage := chk.MemoryUsage()
1908 task.memUsage += memUsage
1909 task.memTracker.Consume(memUsage)
1910 }
1911 iter := chunk.NewIterator4Chunk(chk)
1912 for row := iter.Begin(); row != iter.End(); row = iter.Next() {
1913 task.rows = append(task.rows, row)
1914 }
1915 }
1916
1917 defer trace.StartRegion(ctx, "IndexLookUpTableCompute").End()
1918 {

Callers 1

pickAndExecTaskMethod · 0.95

Calls 15

compareDataMethod · 0.95
BeginMethod · 0.95
EndMethod · 0.95
NextMethod · 0.95
SetMethod · 0.95
LoggerFunction · 0.92
LogFunction · 0.92
CloseFunction · 0.92
TryNewCacheChunkFunction · 0.92
NextFunction · 0.92
NewIterator4ChunkFunction · 0.92
HasCancelledFunction · 0.92

Tested by

no test coverage detected