reduce is to aggregate measures based on dimensions and aggregation function
()
| 217 | |
| 218 | // reduce is to aggregate measures based on dimensions and aggregation function |
| 219 | func (e *BatchExecutorImpl) reduce() { |
| 220 | // init dimIndexVectorD for sorting and reducing |
| 221 | if e.qc.OOPK.IsHLL() { |
| 222 | initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[0].getPointer(), 0, e.qc.OOPK.currentBatch.resultSize, e.stream, e.qc.Device) |
| 223 | initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[1].getPointer(), e.qc.OOPK.currentBatch.resultSize, e.qc.OOPK.currentBatch.resultSize+e.qc.OOPK.currentBatch.size, e.stream, e.qc.Device) |
| 224 | } else if !e.qc.OOPK.UseHashReduction() { |
| 225 | initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[0].getPointer(), 0, e.qc.OOPK.currentBatch.resultSize+e.qc.OOPK.currentBatch.size, e.stream, e.qc.Device) |
| 226 | } |
| 227 | |
| 228 | if e.qc.OOPK.IsHLL() { |
| 229 | e.qc.doProfile(func() { |
| 230 | e.qc.OOPK.hllVectorD, e.qc.OOPK.hllDimRegIDCountD, e.qc.OOPK.hllVectorSize = |
| 231 | e.qc.OOPK.currentBatch.hll(e.qc.OOPK.NumDimsPerDimWidth, e.isLastBatch, e.stream, e.qc.Device) |
| 232 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, hllEvalTiming) |
| 233 | }, "hll", e.stream) |
| 234 | } else if e.qc.OOPK.UseHashReduction() { |
| 235 | e.qc.doProfile(func() { |
| 236 | e.qc.OOPK.currentBatch.hashReduce( |
| 237 | e.qc.OOPK.NumDimsPerDimWidth, e.qc.OOPK.MeasureBytes, e.qc.OOPK.AggregateType, e.stream, e.qc.Device) |
| 238 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, hashReduceEvalTiming) |
| 239 | }, "hash_reduce", e.stream) |
| 240 | } else { |
| 241 | // sort by key. |
| 242 | e.qc.doProfile(func() { |
| 243 | e.qc.OOPK.currentBatch.sortByKey(e.qc.OOPK.NumDimsPerDimWidth, e.stream, e.qc.Device) |
| 244 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, sortEvalTiming) |
| 245 | }, "sort", e.stream) |
| 246 | |
| 247 | // reduce by key. |
| 248 | e.qc.doProfile(func() { |
| 249 | e.qc.OOPK.currentBatch.reduceByKey(e.qc.OOPK.NumDimsPerDimWidth, e.qc.OOPK.MeasureBytes, e.qc.OOPK.AggregateType, e.stream, e.qc.Device) |
| 250 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, reduceEvalTiming) |
| 251 | }, "reduce", e.stream) |
| 252 | } |
| 253 | cgoutils.WaitForCudaStream(e.stream, e.qc.Device) |
| 254 | } |
| 255 | |
| 256 | func (e *BatchExecutorImpl) preExec(isLastBatch bool, start time.Time) { |
| 257 | e.isLastBatch = isLastBatch |
nothing calls this directly
no test coverage detected