batchCheckAndInsert checks rows with duplicate errors. All duplicate rows will be ignored and appended as duplicate warnings.
( ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error, replace bool, )
| 1198 | // batchCheckAndInsert checks rows with duplicate errors. |
| 1199 | // All duplicate rows will be ignored and appended as duplicate warnings. |
| 1200 | func (e *InsertValues) batchCheckAndInsert( |
| 1201 | ctx context.Context, rows [][]types.Datum, |
| 1202 | addRecord func(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error, |
| 1203 | replace bool, |
| 1204 | ) error { |
| 1205 | defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End() |
| 1206 | start := time.Now() |
| 1207 | // Get keys need to be checked. |
| 1208 | toBeCheckedRows, err := getKeysNeedCheck(e.Ctx(), e.Table, rows) |
| 1209 | if err != nil { |
| 1210 | return err |
| 1211 | } |
| 1212 | |
| 1213 | txn, err := e.Ctx().Txn(true) |
| 1214 | if err != nil { |
| 1215 | return err |
| 1216 | } |
| 1217 | setOptionForTopSQL(e.Ctx().GetSessionVars().StmtCtx, txn) |
| 1218 | sc := e.Ctx().GetSessionVars().StmtCtx |
| 1219 | for _, fkc := range e.fkChecks { |
| 1220 | err = fkc.checkRows(ctx, sc, txn, toBeCheckedRows) |
| 1221 | if err != nil { |
| 1222 | return err |
| 1223 | } |
| 1224 | } |
| 1225 | prefetchStart := time.Now() |
| 1226 | // Fill cache using BatchGet, the following Get requests don't need to visit TiKV. |
| 1227 | // Temporary table need not to do prefetch because its all data are stored in the memory. |
| 1228 | if e.Table.Meta().TempTableType == model.TempTableNone { |
| 1229 | if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil { |
| 1230 | return err |
| 1231 | } |
| 1232 | } |
| 1233 | |
| 1234 | if e.stats != nil { |
| 1235 | e.stats.FKCheckTime += prefetchStart.Sub(start) |
| 1236 | e.stats.Prefetch += time.Since(prefetchStart) |
| 1237 | } |
| 1238 | |
| 1239 | // append warnings and get no duplicated error rows |
| 1240 | for i, r := range toBeCheckedRows { |
| 1241 | if r.ignored { |
| 1242 | continue |
| 1243 | } |
| 1244 | if r.handleKey != nil { |
| 1245 | _, err := txn.Get(ctx, r.handleKey.newKey) |
| 1246 | if err == nil { |
| 1247 | if !replace { |
| 1248 | e.Ctx().GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr) |
| 1249 | if txnCtx := e.Ctx().GetSessionVars().TxnCtx; txnCtx.IsPessimistic && |
| 1250 | e.Ctx().GetSessionVars().LockUnchangedKeys { |
| 1251 | // lock duplicated row key on insert-ignore |
| 1252 | txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey, false) |
| 1253 | } |
| 1254 | continue |
| 1255 | } |
| 1256 | handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) |
| 1257 | if err != nil { |
no test coverage detected