MCPcopy
hub / github.com/uber/aresdb / writeBatchRecords

Method writeBatchRecords

memstore/ingestion.go:360–486  ·  view source on GitHub ↗

Read rows from a batch group and write to memStore. Batch id = 0 is for records to be inserted.

(columnDeletions []bool,
	upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool)

Source from the content-addressed store, hash-verified

358
359// Read rows from a batch group and write to memStore. Batch id = 0 is for records to be inserted.
360func (shard *TableShard) writeBatchRecords(columnDeletions []bool,
361 upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool) error {
362 var batch *LiveBatch
363 if forUpdate {
364 // We need to lock the batch for update to achieve row level consistency.
365 batch = shard.LiveStore.GetBatchForWrite(batchID)
366 defer batch.Unlock()
367 } else {
368 // Make sure all columns are created.
369 batch = shard.LiveStore.GetBatchForWrite(batchID)
370 for i := 0; i < upsertBatch.NumColumns; i++ {
371 columnID, _ := upsertBatch.GetColumnID(i)
372 if columnDeletions[columnID] {
373 continue
374 }
375 batch.GetOrCreateVectorParty(columnID, true)
376 }
377 batch.Unlock()
378
379 batch.RLock()
380 defer batch.RUnlock()
381 }
382
383 if batch.MaxArrivalTime < upsertBatch.ArrivalTime {
384 batch.MaxArrivalTime = upsertBatch.ArrivalTime
385 }
386
387 // Instead of traversing row by row, we instead do column by column to avoid making checks on each row.
388 for col := 0; col < upsertBatch.NumColumns; col++ {
389 columnID, err := upsertBatch.GetColumnID(col)
390 if err != nil {
391 return utils.StackError(err, "Failed to get column id for col %d", col)
392 }
393 if columnDeletions[columnID] {
394 continue
395 }
396
397 columnUpdateMode := upsertBatch.GetColumnUpdateMode(col)
398 columnMode := upsertBatch.GetColumMode(col)
399
400 // we will skip processing this column if
401 // 1. columnMode is AllValuesDefault
402 // 2. columnUpdateMode is UpdateOverwriteNotNull
403 if columnMode == common.AllValuesDefault && columnUpdateMode == common.UpdateOverwriteNotNull {
404 continue
405 }
406
407 if col >= upsertBatch.GetColumnLen() {
408 return utils.StackError(nil, "Column index %d out of range %d", col, upsertBatch.GetColumnLen())
409 }
410
411 vectorParty := batch.GetOrCreateVectorParty(columnID, true)
412 dataType, _ := upsertBatch.GetColumnType(col)
413 cmpFunc := common.GetCompareFunc(dataType)
414
415 // check whether the update mode is valid based on data type.
416 forceWrite := false
417 if forUpdate {

Callers 1

ApplyUpsertBatchMethod · 0.95

Calls 15

StackErrorFunction · 0.92
GetCompareFuncFunction · 0.92
IsNumericFunction · 0.92
IsGoTypeFunction · 0.92
AdditionUpdateFunction · 0.92
MinMaxUpdateFunction · 0.92
GetBatchForWriteMethod · 0.80
GetColumnUpdateModeMethod · 0.80
GetColumModeMethod · 0.80
GetColumnLenMethod · 0.80
GetColumnTypeMethod · 0.80

Tested by

no test coverage detected