MCPcopy
hub / github.com/uber/aresdb / prepareForeignTable

Method prepareForeignTable

query/aql_processor.go:390–449  ·  view source on GitHub ↗

prepare foreign table (allocate and transfer memory) before processing

(memStore memstore.MemStore, joinTableID int, join queryCom.Join)

Source from the content-addressed store, hash-verified

388
389// prepare foreign table (allocate and transfer memory) before processing
390func (qc *AQLQueryContext) prepareForeignTable(memStore memstore.MemStore, joinTableID int, join queryCom.Join) {
391 ft := qc.OOPK.foreignTables[joinTableID]
392 if ft == nil {
393 return
394 }
395
396 // join only support dimension table for now
397 // and dimension table is not shared
398 shard, err := memStore.GetTableShard(join.Table, 0)
399 if err != nil {
400 qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", join.Table, 0)
401 return
402 }
403 defer shard.Users.Done()
404
405 // only need live store for dimension table
406 batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
407 ft.numRecordsInLastBatch = numRecordsInLastBatch
408 deviceBatches := make([][]deviceVectorPartySlice, len(batchIDs))
409
410 // transfer primary key
411 hostPrimaryKeyData := shard.LiveStore.PrimaryKey.LockForTransfer()
412 devicePrimaryKeyPtr := deviceAllocate(hostPrimaryKeyData.NumBytes, qc.Device)
413 cgoutils.AsyncCopyHostToDevice(devicePrimaryKeyPtr.getPointer(), hostPrimaryKeyData.Data, hostPrimaryKeyData.NumBytes, qc.cudaStreams[0], qc.Device)
414 cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device)
415 ft.hostPrimaryKeyData = hostPrimaryKeyData
416 ft.devicePrimaryKeyPtr = devicePrimaryKeyPtr
417 shard.LiveStore.PrimaryKey.UnlockAfterTransfer()
418
419 // allocate device memory
420 for i, batchID := range batchIDs {
421 batch := shard.LiveStore.GetBatchForRead(batchID)
422 if batch == nil {
423 continue
424 }
425 batchIndex := batchID - memstore.BaseBatchID
426 deviceBatches[batchIndex] = make([]deviceVectorPartySlice, len(qc.TableScanners[joinTableID+1].Columns))
427
428 size := batch.Capacity
429 if i == len(batchIDs)-1 {
430 size = numRecordsInLastBatch
431 }
432 for i, columnID := range qc.TableScanners[joinTableID+1].Columns {
433 usage := qc.TableScanners[joinTableID+1].ColumnUsages[columnID]
434 if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 {
435 sourceVP := batch.Columns[columnID]
436 if sourceVP == nil {
437 continue
438 }
439
440 hostVPSlice := sourceVP.(memstore.TransferableVectorParty).GetHostVectorPartySlice(0, size)
441 deviceBatches[batchIndex][i] = hostToDeviceColumn(hostVPSlice, qc.Device)
442 copyHostToDevice(hostVPSlice, deviceBatches[batchIndex][i], qc.cudaStreams[0], qc.Device)
443 }
444 }
445 cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device)
446 batch.RUnlock()
447 }

Callers 1

ProcessQueryMethod · 0.95

Calls 15

StackErrorFunction · 0.92
AsyncCopyHostToDeviceFunction · 0.92
WaitForCudaStreamFunction · 0.92
deviceAllocateFunction · 0.85
hostToDeviceColumnFunction · 0.85
copyHostToDeviceFunction · 0.85
GetBatchIDsMethod · 0.80
getPointerMethod · 0.80
GetTableShardMethod · 0.65
DoneMethod · 0.65
LockForTransferMethod · 0.65
UnlockAfterTransferMethod · 0.65

Tested by

no test coverage detected