Read rows from a batch group and write to memStore. Batch id = 0 is for records to be inserted.
(columnDeletions []bool, upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool)
| 358 | |
| 359 | // Read rows from a batch group and write to memStore. Batch id = 0 is for records to be inserted. |
| 360 | func (shard *TableShard) writeBatchRecords(columnDeletions []bool, |
| 361 | upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool) error { |
| 362 | var batch *LiveBatch |
| 363 | if forUpdate { |
| 364 | // We need to lock the batch for update to achieve row level consistency. |
| 365 | batch = shard.LiveStore.GetBatchForWrite(batchID) |
| 366 | defer batch.Unlock() |
| 367 | } else { |
| 368 | // Make sure all columns are created. |
| 369 | batch = shard.LiveStore.GetBatchForWrite(batchID) |
| 370 | for i := 0; i < upsertBatch.NumColumns; i++ { |
| 371 | columnID, _ := upsertBatch.GetColumnID(i) |
| 372 | if columnDeletions[columnID] { |
| 373 | continue |
| 374 | } |
| 375 | batch.GetOrCreateVectorParty(columnID, true) |
| 376 | } |
| 377 | batch.Unlock() |
| 378 | |
| 379 | batch.RLock() |
| 380 | defer batch.RUnlock() |
| 381 | } |
| 382 | |
| 383 | if batch.MaxArrivalTime < upsertBatch.ArrivalTime { |
| 384 | batch.MaxArrivalTime = upsertBatch.ArrivalTime |
| 385 | } |
| 386 | |
| 387 | // Instead of traversing row by row, we instead do column by column to avoid making checks on each row. |
| 388 | for col := 0; col < upsertBatch.NumColumns; col++ { |
| 389 | columnID, err := upsertBatch.GetColumnID(col) |
| 390 | if err != nil { |
| 391 | return utils.StackError(err, "Failed to get column id for col %d", col) |
| 392 | } |
| 393 | if columnDeletions[columnID] { |
| 394 | continue |
| 395 | } |
| 396 | |
| 397 | columnUpdateMode := upsertBatch.GetColumnUpdateMode(col) |
| 398 | columnMode := upsertBatch.GetColumMode(col) |
| 399 | |
| 400 | // we will skip processing this column if |
| 401 | // 1. columnMode is AllValuesDefault |
| 402 | // 2. columnUpdateMode is UpdateOverwriteNotNull |
| 403 | if columnMode == common.AllValuesDefault && columnUpdateMode == common.UpdateOverwriteNotNull { |
| 404 | continue |
| 405 | } |
| 406 | |
| 407 | if col >= upsertBatch.GetColumnLen() { |
| 408 | return utils.StackError(nil, "Column index %d out of range %d", col, upsertBatch.GetColumnLen()) |
| 409 | } |
| 410 | |
| 411 | vectorParty := batch.GetOrCreateVectorParty(columnID, true) |
| 412 | dataType, _ := upsertBatch.GetColumnType(col) |
| 413 | cmpFunc := common.GetCompareFunc(dataType) |
| 414 | |
| 415 | // check whether the update mode is valid based on data type. |
| 416 | forceWrite := false |
| 417 | if forUpdate { |
no test coverage detected