MCPcopy
hub / github.com/uber/aresdb / processBatch

Method processBatch

query/aql_processor.go:813–907  ·  view source on GitHub ↗

processBatch allocates device memory and starts async input data transferring to device memory. It then invokes previousBatchExecutor asynchronously to process the previous batch. When both async operations finish, it prepares for the current batch execution and returns it as a function closure to b

(
	batch *memCom.Batch, batchID int32, batchSize int, transferFunc batchTransferExecutor,
	customFilterFunc customFilterExecutor, previousBatchExecutor BatchExecutor, needToUnlockBatch bool)

Source from the content-addressed store, hash-verified

811// a function closure to be invoked later. customFilterExecutor is the executor
812// to apply custom filters for live batch and archive batch.
813func (qc *AQLQueryContext) processBatch(
814 batch *memCom.Batch, batchID int32, batchSize int, transferFunc batchTransferExecutor,
815 customFilterFunc customFilterExecutor, previousBatchExecutor BatchExecutor, needToUnlockBatch bool) BatchExecutor {
816 defer func() {
817 if needToUnlockBatch {
818 batch.RUnlock()
819 }
820 }()
821
822 if qc.Debug {
823 // Finish executing previous batch first to avoid timeline overlapping
824 qc.runBatchExecutor(previousBatchExecutor, false)
825 previousBatchExecutor = NewDummyBatchExecutor()
826 }
827
828 // reset stats.
829 qc.OOPK.currentBatch.stats = oopkBatchStats{
830 batchID: batchID,
831 timings: make(map[stageName]float64),
832 }
833 start := utils.Now()
834
835 // Async transfer.
836 stream := qc.cudaStreams[0]
837 deviceSlices, hostVPs, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter := transferFunc(stream)
838 qc.OOPK.currentBatch.stats.bytesTransferred += totalBytes
839 qc.OOPK.currentBatch.stats.numTransferCalls += numTransfers
840
841 qc.reportTimingForCurrentBatch(stream, &start, transferTiming)
842
843 // Async execute the previous batch.
844 executionDone := make(chan struct{ error }, 1)
845 go func() {
846 defer func() {
847 if r := recover(); r != nil {
848 var err error
849 // find out exactly what the error was and set err
850 switch x := r.(type) {
851 case string:
852 err = utils.StackError(nil, x)
853 case error:
854 err = utils.StackError(x, "Panic happens when executing query")
855 default:
856 err = utils.StackError(nil, "Panic happens when executing query %v", x)
857 }
858 executionDone <- struct{ error }{err}
859 }
860 }()
861 qc.runBatchExecutor(previousBatchExecutor, false)
862 executionDone <- struct{ error }{}
863 }()
864
865 // Wait for data transfer of the current batch.
866 cgoutils.WaitForCudaStream(stream, qc.Device)
867
868 for _, vp := range hostVPs {
869 if vp != nil {
870 // only archive vector party will be returned after transfer function

Callers 1

processShardMethod · 0.95

Calls 11

runBatchExecutorMethod · 0.95
NowFunction · 0.92
StackErrorFunction · 0.92
WaitForCudaStreamFunction · 0.92
NewDummyBatchExecutorFunction · 0.85
deviceFreeAndSetNilFunction · 0.85
NewBatchExecutorFunction · 0.85
prepareForFilteringMethod · 0.80
RUnlockMethod · 0.65
ReleaseMethod · 0.65

Tested by

no test coverage detected