MCPcopy
hub / github.com/pingcap/tidb / batchCheckAndInsert

Method batchCheckAndInsert

pkg/executor/insert_common.go:1200–1334  ·  view source on GitHub ↗

batchCheckAndInsert checks rows with duplicate errors. All duplicate rows will be ignored and appended as duplicate warnings.

(
	ctx context.Context, rows [][]types.Datum,
	addRecord func(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error,
	replace bool,
)

Source from the content-addressed store, hash-verified

1198// batchCheckAndInsert checks rows with duplicate errors.
1199// All duplicate rows will be ignored and appended as duplicate warnings.
1200func (e *InsertValues) batchCheckAndInsert(
1201 ctx context.Context, rows [][]types.Datum,
1202 addRecord func(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error,
1203 replace bool,
1204) error {
1205 defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
1206 start := time.Now()
1207 // Get keys need to be checked.
1208 toBeCheckedRows, err := getKeysNeedCheck(e.Ctx(), e.Table, rows)
1209 if err != nil {
1210 return err
1211 }
1212
1213 txn, err := e.Ctx().Txn(true)
1214 if err != nil {
1215 return err
1216 }
1217 setOptionForTopSQL(e.Ctx().GetSessionVars().StmtCtx, txn)
1218 sc := e.Ctx().GetSessionVars().StmtCtx
1219 for _, fkc := range e.fkChecks {
1220 err = fkc.checkRows(ctx, sc, txn, toBeCheckedRows)
1221 if err != nil {
1222 return err
1223 }
1224 }
1225 prefetchStart := time.Now()
1226 // Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
1227 // Temporary table need not to do prefetch because its all data are stored in the memory.
1228 if e.Table.Meta().TempTableType == model.TempTableNone {
1229 if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
1230 return err
1231 }
1232 }
1233
1234 if e.stats != nil {
1235 e.stats.FKCheckTime += prefetchStart.Sub(start)
1236 e.stats.Prefetch += time.Since(prefetchStart)
1237 }
1238
1239 // append warnings and get no duplicated error rows
1240 for i, r := range toBeCheckedRows {
1241 if r.ignored {
1242 continue
1243 }
1244 if r.handleKey != nil {
1245 _, err := txn.Get(ctx, r.handleKey.newKey)
1246 if err == nil {
1247 if !replace {
1248 e.Ctx().GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
1249 if txnCtx := e.Ctx().GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
1250 e.Ctx().GetSessionVars().LockUnchangedKeys {
1251 // lock duplicated row key on insert-ignore
1252 txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey, false)
1253 }
1254 continue
1255 }
1256 handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
1257 if err != nil {

Callers 2

execMethod · 0.80

Calls 15

removeRowMethod · 0.95
handleDuplicateKeyMethod · 0.95
StartRegionFunction · 0.92
DecodeRowKeyFunction · 0.92
IsErrNotFoundFunction · 0.92
IsTempIndexKeyFunction · 0.92
TempIndexKey2IndexKeyFunction · 0.92
getKeysNeedCheckFunction · 0.85
setOptionForTopSQLFunction · 0.85
prefetchUniqueIndicesFunction · 0.85
checkRowsMethod · 0.80

Tested by

no test coverage detected