transferLiveBatch returns a functor to transfer a live batch to device memory. The size parameter will be either the size of the batch or num records in last batch. hostColumns will always be empty since we should not release a vector party of a live batch. Start row will always be zero as well.
(batch *memstore.LiveBatch, size int)
| 503 | // size of the batch or num records in last batch. hostColumns will always be empty since we should not release a vector |
| 504 | // party of a live batch. Start row will always be zero as well. |
| 505 | func (qc *AQLQueryContext) transferLiveBatch(batch *memstore.LiveBatch, size int) batchTransferExecutor { |
| 506 | return func(stream unsafe.Pointer) (deviceColumns []deviceVectorPartySlice, hostVPs []memCom.VectorParty, |
| 507 | firstColumn, startRow, totalBytes, numTransfers, sizeAfterPrefilter int) { |
| 508 | // Allocate column inputs. |
| 509 | firstColumn = -1 |
| 510 | deviceColumns = make([]deviceVectorPartySlice, len(qc.TableScanners[0].Columns)) |
| 511 | for i, columnID := range qc.TableScanners[0].Columns { |
| 512 | usage := qc.TableScanners[0].ColumnUsages[columnID] |
| 513 | if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { |
| 514 | if firstColumn < 0 { |
| 515 | firstColumn = i |
| 516 | } |
| 517 | sourceVP := batch.Columns[columnID] |
| 518 | if sourceVP == nil { |
| 519 | continue |
| 520 | } |
| 521 | |
| 522 | hostColumn := sourceVP.(memstore.TransferableVectorParty).GetHostVectorPartySlice(0, size) |
| 523 | deviceColumns[i] = hostToDeviceColumn(hostColumn, qc.Device) |
| 524 | b, t := copyHostToDevice(hostColumn, deviceColumns[i], stream, qc.Device) |
| 525 | totalBytes += b |
| 526 | numTransfers += t |
| 527 | } |
| 528 | } |
| 529 | sizeAfterPrefilter = size |
| 530 | return |
| 531 | } |
| 532 | } |
| 533 | |
| 534 | // liveBatchTimeFilterExecutor returns a functor to apply custom time filters to live batch. |
| 535 | func (qc *AQLQueryContext) liveBatchCustomFilterExecutor(cutoff uint32) customFilterExecutor { |
no test coverage detected