transferArchiveBatch returns the functor to transfer an archive batch to device memory. We will need to release hostColumns after transfer completes.
(batch *memstore.ArchiveBatch, isFirstOrLast bool)
| 561 | // transferArchiveBatch returns the functor to transfer an archive batch to device memory. We will need to release |
| 562 | // hostColumns after transfer completes. |
| 563 | func (qc *AQLQueryContext) transferArchiveBatch(batch *memstore.ArchiveBatch, |
| 564 | isFirstOrLast bool) batchTransferExecutor { |
| 565 | return func(stream unsafe.Pointer) (deviceSlices []deviceVectorPartySlice, hostVPs []memCom.VectorParty, |
| 566 | firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter int) { |
| 567 | matchedColumnUsages := columnUsedByAllBatches |
| 568 | if isFirstOrLast { |
| 569 | matchedColumnUsages |= columnUsedByFirstArchiveBatch | columnUsedByLastArchiveBatch |
| 570 | } |
| 571 | |
| 572 | // Request columns, prefilter-slicing, allocate column inputs. |
| 573 | firstColumn = -1 |
| 574 | hostVPs = make([]memCom.VectorParty, len(qc.TableScanners[0].Columns)) |
| 575 | hostSlices := make([]memCom.HostVectorPartySlice, len(qc.TableScanners[0].Columns)) |
| 576 | deviceSlices = make([]deviceVectorPartySlice, len(qc.TableScanners[0].Columns)) |
| 577 | endRow := batch.Size |
| 578 | prefilterIndex := 0 |
| 579 | // Must iterate in reverse order to apply prefilter slicing properly. |
| 580 | for i := len(qc.TableScanners[0].Columns) - 1; i >= 0; i-- { |
| 581 | columnID := qc.TableScanners[0].Columns[i] |
| 582 | usage := qc.TableScanners[0].ColumnUsages[columnID] |
| 583 | |
| 584 | if usage&matchedColumnUsages != 0 || usage&columnUsedByPrefilter != 0 { |
| 585 | // Request/pin column from disk and wait. |
| 586 | vp := batch.RequestVectorParty(columnID) |
| 587 | vp.WaitForDiskLoad() |
| 588 | |
| 589 | // prefilter slicing |
| 590 | startRow, endRow, hostSlices[i] = qc.prefilterSlice(vp, prefilterIndex, startRow, endRow) |
| 591 | prefilterIndex++ |
| 592 | |
| 593 | if usage&matchedColumnUsages != 0 { |
| 594 | hostVPs[i] = vp |
| 595 | firstColumn = i |
| 596 | deviceSlices[i] = hostToDeviceColumn(hostSlices[i], qc.Device) |
| 597 | } else { |
| 598 | vp.Release() |
| 599 | } |
| 600 | } |
| 601 | } |
| 602 | |
| 603 | for i, dstVPSlice := range deviceSlices { |
| 604 | columnID := qc.TableScanners[0].Columns[i] |
| 605 | usage := qc.TableScanners[0].ColumnUsages[columnID] |
| 606 | if usage&matchedColumnUsages != 0 { |
| 607 | srcVPSlice := hostSlices[i] |
| 608 | b, t := copyHostToDevice(srcVPSlice, dstVPSlice, stream, qc.Device) |
| 609 | totalBytes += b |
| 610 | numTransfers += t |
| 611 | } |
| 612 | } |
| 613 | sizeAfterPreFilter = endRow - startRow |
| 614 | return |
| 615 | } |
| 616 | } |
| 617 | |
| 618 | // archiveBatchCustomFilterExecutor returns a functor to apply custom filter to first or last archive batch. |
| 619 | func (qc *AQLQueryContext) archiveBatchCustomFilterExecutor(isFirstOrLast bool) customFilterExecutor { |
no test coverage detected