prepare foreign table (allocate and transfer memory) before processing
(memStore memstore.MemStore, joinTableID int, join queryCom.Join)
| 388 | |
| 389 | // prepare foreign table (allocate and transfer memory) before processing |
| 390 | func (qc *AQLQueryContext) prepareForeignTable(memStore memstore.MemStore, joinTableID int, join queryCom.Join) { |
| 391 | ft := qc.OOPK.foreignTables[joinTableID] |
| 392 | if ft == nil { |
| 393 | return |
| 394 | } |
| 395 | |
| 396 | // join only support dimension table for now |
| 397 | // and dimension table is not shared |
| 398 | shard, err := memStore.GetTableShard(join.Table, 0) |
| 399 | if err != nil { |
| 400 | qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", join.Table, 0) |
| 401 | return |
| 402 | } |
| 403 | defer shard.Users.Done() |
| 404 | |
| 405 | // only need live store for dimension table |
| 406 | batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs() |
| 407 | ft.numRecordsInLastBatch = numRecordsInLastBatch |
| 408 | deviceBatches := make([][]deviceVectorPartySlice, len(batchIDs)) |
| 409 | |
| 410 | // transfer primary key |
| 411 | hostPrimaryKeyData := shard.LiveStore.PrimaryKey.LockForTransfer() |
| 412 | devicePrimaryKeyPtr := deviceAllocate(hostPrimaryKeyData.NumBytes, qc.Device) |
| 413 | cgoutils.AsyncCopyHostToDevice(devicePrimaryKeyPtr.getPointer(), hostPrimaryKeyData.Data, hostPrimaryKeyData.NumBytes, qc.cudaStreams[0], qc.Device) |
| 414 | cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device) |
| 415 | ft.hostPrimaryKeyData = hostPrimaryKeyData |
| 416 | ft.devicePrimaryKeyPtr = devicePrimaryKeyPtr |
| 417 | shard.LiveStore.PrimaryKey.UnlockAfterTransfer() |
| 418 | |
| 419 | // allocate device memory |
| 420 | for i, batchID := range batchIDs { |
| 421 | batch := shard.LiveStore.GetBatchForRead(batchID) |
| 422 | if batch == nil { |
| 423 | continue |
| 424 | } |
| 425 | batchIndex := batchID - memstore.BaseBatchID |
| 426 | deviceBatches[batchIndex] = make([]deviceVectorPartySlice, len(qc.TableScanners[joinTableID+1].Columns)) |
| 427 | |
| 428 | size := batch.Capacity |
| 429 | if i == len(batchIDs)-1 { |
| 430 | size = numRecordsInLastBatch |
| 431 | } |
| 432 | for i, columnID := range qc.TableScanners[joinTableID+1].Columns { |
| 433 | usage := qc.TableScanners[joinTableID+1].ColumnUsages[columnID] |
| 434 | if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { |
| 435 | sourceVP := batch.Columns[columnID] |
| 436 | if sourceVP == nil { |
| 437 | continue |
| 438 | } |
| 439 | |
| 440 | hostVPSlice := sourceVP.(memstore.TransferableVectorParty).GetHostVectorPartySlice(0, size) |
| 441 | deviceBatches[batchIndex][i] = hostToDeviceColumn(hostVPSlice, qc.Device) |
| 442 | copyHostToDevice(hostVPSlice, deviceBatches[batchIndex][i], qc.cudaStreams[0], qc.Device) |
| 443 | } |
| 444 | } |
| 445 | cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device) |
| 446 | batch.RUnlock() |
| 447 | } |
no test coverage detected