processBatch allocates device memory and starts async input data transferring to device memory. It then invokes previousBatchExecutor asynchronously to process the previous batch. When both async operations finish, it prepares for the current batch execution and returns it as a function closure to b
( batch *memCom.Batch, batchID int32, batchSize int, transferFunc batchTransferExecutor, customFilterFunc customFilterExecutor, previousBatchExecutor BatchExecutor, needToUnlockBatch bool)
| 811 | // a function closure to be invoked later. customFilterExecutor is the executor |
| 812 | // to apply custom filters for live batch and archive batch. |
| 813 | func (qc *AQLQueryContext) processBatch( |
| 814 | batch *memCom.Batch, batchID int32, batchSize int, transferFunc batchTransferExecutor, |
| 815 | customFilterFunc customFilterExecutor, previousBatchExecutor BatchExecutor, needToUnlockBatch bool) BatchExecutor { |
| 816 | defer func() { |
| 817 | if needToUnlockBatch { |
| 818 | batch.RUnlock() |
| 819 | } |
| 820 | }() |
| 821 | |
| 822 | if qc.Debug { |
| 823 | // Finish executing previous batch first to avoid timeline overlapping |
| 824 | qc.runBatchExecutor(previousBatchExecutor, false) |
| 825 | previousBatchExecutor = NewDummyBatchExecutor() |
| 826 | } |
| 827 | |
| 828 | // reset stats. |
| 829 | qc.OOPK.currentBatch.stats = oopkBatchStats{ |
| 830 | batchID: batchID, |
| 831 | timings: make(map[stageName]float64), |
| 832 | } |
| 833 | start := utils.Now() |
| 834 | |
| 835 | // Async transfer. |
| 836 | stream := qc.cudaStreams[0] |
| 837 | deviceSlices, hostVPs, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter := transferFunc(stream) |
| 838 | qc.OOPK.currentBatch.stats.bytesTransferred += totalBytes |
| 839 | qc.OOPK.currentBatch.stats.numTransferCalls += numTransfers |
| 840 | |
| 841 | qc.reportTimingForCurrentBatch(stream, &start, transferTiming) |
| 842 | |
| 843 | // Async execute the previous batch. |
| 844 | executionDone := make(chan struct{ error }, 1) |
| 845 | go func() { |
| 846 | defer func() { |
| 847 | if r := recover(); r != nil { |
| 848 | var err error |
| 849 | // find out exactly what the error was and set err |
| 850 | switch x := r.(type) { |
| 851 | case string: |
| 852 | err = utils.StackError(nil, x) |
| 853 | case error: |
| 854 | err = utils.StackError(x, "Panic happens when executing query") |
| 855 | default: |
| 856 | err = utils.StackError(nil, "Panic happens when executing query %v", x) |
| 857 | } |
| 858 | executionDone <- struct{ error }{err} |
| 859 | } |
| 860 | }() |
| 861 | qc.runBatchExecutor(previousBatchExecutor, false) |
| 862 | executionDone <- struct{ error }{} |
| 863 | }() |
| 864 | |
| 865 | // Wait for data transfer of the current batch. |
| 866 | cgoutils.WaitForCudaStream(stream, qc.Device) |
| 867 | |
| 868 | for _, vp := range hostVPs { |
| 869 | if vp != nil { |
| 870 | // only archive vector party will be returned after transfer function |
no test coverage detected