executeTask executes the table look up tasks. We will construct a table reader and send request by handles. Then we hold the returning rows and finish this task.
(ctx context.Context, task *lookupTableTask)
| 1859 | // executeTask executes the table look up tasks. We will construct a table reader and send request by handles. |
| 1860 | // Then we hold the returning rows and finish this task. |
| 1861 | func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { |
| 1862 | tableReader, err := w.idxLookup.buildTableReader(ctx, task) |
| 1863 | task.buildDoneTime = time.Now() |
| 1864 | if err != nil { |
| 1865 | if ctx.Err() != context.Canceled { |
| 1866 | logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) |
| 1867 | } |
| 1868 | return err |
| 1869 | } |
| 1870 | defer func() { terror.Log(exec.Close(tableReader)) }() |
| 1871 | |
| 1872 | if w.checkIndexValue != nil { |
| 1873 | return w.compareData(ctx, task, tableReader) |
| 1874 | } |
| 1875 | |
| 1876 | { |
| 1877 | task.memTracker = w.memTracker |
| 1878 | memUsage := int64(cap(task.handles))*size.SizeOfInterface + tableReader.memUsage() |
| 1879 | for _, h := range task.handles { |
| 1880 | memUsage += int64(h.MemUsage()) |
| 1881 | } |
| 1882 | if task.indexOrder != nil { |
| 1883 | memUsage += task.indexOrder.MemUsage() |
| 1884 | } |
| 1885 | if task.duplicatedIndexOrder != nil { |
| 1886 | memUsage += task.duplicatedIndexOrder.MemUsage() |
| 1887 | } |
| 1888 | memUsage += task.idxRows.MemoryUsage() |
| 1889 | task.memUsage = memUsage |
| 1890 | task.memTracker.Consume(memUsage) |
| 1891 | } |
| 1892 | handleCnt := len(task.handles) |
| 1893 | task.rows = make([]chunk.Row, 0, handleCnt) |
| 1894 | for { |
| 1895 | chk := exec.TryNewCacheChunk(tableReader) |
| 1896 | err = exec.Next(ctx, tableReader, chk) |
| 1897 | if err != nil { |
| 1898 | if ctx.Err() != context.Canceled { |
| 1899 | logutil.Logger(ctx).Warn("table reader fetch next chunk failed", zap.Error(err)) |
| 1900 | } |
| 1901 | return err |
| 1902 | } |
| 1903 | if chk.NumRows() == 0 { |
| 1904 | break |
| 1905 | } |
| 1906 | { |
| 1907 | memUsage := chk.MemoryUsage() |
| 1908 | task.memUsage += memUsage |
| 1909 | task.memTracker.Consume(memUsage) |
| 1910 | } |
| 1911 | iter := chunk.NewIterator4Chunk(chk) |
| 1912 | for row := iter.Begin(); row != iter.End(); row = iter.Next() { |
| 1913 | task.rows = append(task.rows, row) |
| 1914 | } |
| 1915 | } |
| 1916 | |
| 1917 | defer trace.StartRegion(ctx, "IndexLookUpTableCompute").End() |
| 1918 | { |
no test coverage detected