MCPcopy
hub / github.com/uber/aresdb / transferArchiveBatch

Method transferArchiveBatch

query/aql_processor.go:563–616  ·  view source on GitHub ↗

transferArchiveBatch returns the functor to transfer an archive batch to device memory. We will need to release hostColumns after transfer completes.

(batch *memstore.ArchiveBatch,
	isFirstOrLast bool)

Source from the content-addressed store, hash-verified

561// transferArchiveBatch returns the functor to transfer an archive batch to device memory. We will need to release
562// hostColumns after transfer completes.
563func (qc *AQLQueryContext) transferArchiveBatch(batch *memstore.ArchiveBatch,
564 isFirstOrLast bool) batchTransferExecutor {
565 return func(stream unsafe.Pointer) (deviceSlices []deviceVectorPartySlice, hostVPs []memCom.VectorParty,
566 firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter int) {
567 matchedColumnUsages := columnUsedByAllBatches
568 if isFirstOrLast {
569 matchedColumnUsages |= columnUsedByFirstArchiveBatch | columnUsedByLastArchiveBatch
570 }
571
572 // Request columns, prefilter-slicing, allocate column inputs.
573 firstColumn = -1
574 hostVPs = make([]memCom.VectorParty, len(qc.TableScanners[0].Columns))
575 hostSlices := make([]memCom.HostVectorPartySlice, len(qc.TableScanners[0].Columns))
576 deviceSlices = make([]deviceVectorPartySlice, len(qc.TableScanners[0].Columns))
577 endRow := batch.Size
578 prefilterIndex := 0
579 // Must iterate in reverse order to apply prefilter slicing properly.
580 for i := len(qc.TableScanners[0].Columns) - 1; i >= 0; i-- {
581 columnID := qc.TableScanners[0].Columns[i]
582 usage := qc.TableScanners[0].ColumnUsages[columnID]
583
584 if usage&matchedColumnUsages != 0 || usage&columnUsedByPrefilter != 0 {
585 // Request/pin column from disk and wait.
586 vp := batch.RequestVectorParty(columnID)
587 vp.WaitForDiskLoad()
588
589 // prefilter slicing
590 startRow, endRow, hostSlices[i] = qc.prefilterSlice(vp, prefilterIndex, startRow, endRow)
591 prefilterIndex++
592
593 if usage&matchedColumnUsages != 0 {
594 hostVPs[i] = vp
595 firstColumn = i
596 deviceSlices[i] = hostToDeviceColumn(hostSlices[i], qc.Device)
597 } else {
598 vp.Release()
599 }
600 }
601 }
602
603 for i, dstVPSlice := range deviceSlices {
604 columnID := qc.TableScanners[0].Columns[i]
605 usage := qc.TableScanners[0].ColumnUsages[columnID]
606 if usage&matchedColumnUsages != 0 {
607 srcVPSlice := hostSlices[i]
608 b, t := copyHostToDevice(srcVPSlice, dstVPSlice, stream, qc.Device)
609 totalBytes += b
610 numTransfers += t
611 }
612 }
613 sizeAfterPreFilter = endRow - startRow
614 return
615 }
616}
617
618// archiveBatchCustomFilterExecutor returns a functor to apply custom filter to first or last archive batch.
619func (qc *AQLQueryContext) archiveBatchCustomFilterExecutor(isFirstOrLast bool) customFilterExecutor {

Callers 1

processShardMethod · 0.95

Calls 6

prefilterSliceMethod · 0.95
hostToDeviceColumnFunction · 0.85
copyHostToDeviceFunction · 0.85
RequestVectorPartyMethod · 0.80
WaitForDiskLoadMethod · 0.65
ReleaseMethod · 0.65

Tested by

no test coverage detected