join
()
| 114 | |
| 115 | // join |
| 116 | func (e *BatchExecutorImpl) join() { |
| 117 | e.qc.doProfile(func() { |
| 118 | // join foreign tables |
| 119 | for joinTableID, foreignTable := range e.qc.OOPK.foreignTables { |
| 120 | if foreignTable != nil { |
| 121 | // prepare foreign table recordIDs |
| 122 | // Note: |
| 123 | // RecordID { |
| 124 | // int32_t batchID |
| 125 | // uint32_t index |
| 126 | // } |
| 127 | // takes up 8 bytes |
| 128 | e.qc.OOPK.currentBatch.foreignTableRecordIDsD = append(e.qc.OOPK.currentBatch.foreignTableRecordIDsD, deviceAllocate(8*e.qc.OOPK.currentBatch.size, e.qc.Device)) |
| 129 | mainTableJoinColumnIndex := e.qc.TableScanners[0].ColumnsByIDs[foreignTable.remoteJoinColumn.ColumnID] |
| 130 | // perform hash lookup |
| 131 | e.qc.OOPK.currentBatch.prepareForeignRecordIDs(mainTableJoinColumnIndex, joinTableID, *foreignTable, e.stream, e.qc.Device) |
| 132 | } |
| 133 | } |
| 134 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, prepareForeignRecordIDsTiming) |
| 135 | }, "joins", e.stream) |
| 136 | |
| 137 | e.qc.doProfile(func() { |
| 138 | // process filters that involves foreign table columns if any |
| 139 | for _, filter := range e.qc.OOPK.ForeignTableCommonFilters { |
| 140 | e.qc.OOPK.currentBatch.processExpression(filter, nil, |
| 141 | e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, e.qc.OOPK.currentBatch.filterAction) |
| 142 | } |
| 143 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, foreignTableFilterEvalTiming) |
| 144 | }, "filters", e.stream) |
| 145 | |
| 146 | if e.qc.OOPK.geoIntersection != nil { |
| 147 | // allocate two predicate vector for geo intersect |
| 148 | numWords := (e.qc.OOPK.geoIntersection.numShapes + 31) / 32 |
| 149 | e.qc.OOPK.currentBatch.geoPredicateVectorD = deviceAllocate(e.qc.OOPK.currentBatch.size*4*numWords, e.qc.Device) |
| 150 | } |
| 151 | |
| 152 | e.sizeBeforeGeoFilter = e.qc.OOPK.currentBatch.size |
| 153 | e.qc.doProfile(func() { |
| 154 | if e.qc.OOPK.geoIntersection != nil { |
| 155 | pointColumnIndex := e.qc.TableScanners[e.qc.OOPK.geoIntersection.pointTableID]. |
| 156 | ColumnsByIDs[e.qc.OOPK.geoIntersection.pointColumnID] |
| 157 | e.qc.OOPK.currentBatch.geoIntersect( |
| 158 | e.qc.OOPK.geoIntersection, |
| 159 | pointColumnIndex, |
| 160 | e.qc.OOPK.foreignTables, |
| 161 | e.qc.OOPK.currentBatch.geoPredicateVectorD, |
| 162 | e.stream, e.qc.Device) |
| 163 | } |
| 164 | e.qc.reportTimingForCurrentBatch(e.stream, &e.start, geoIntersectEvalTiming) |
| 165 | }, "geo_intersect", e.stream) |
| 166 | } |
| 167 | |
| 168 | // evalMeasures is to fill measure values |
| 169 | func (e *BatchExecutorImpl) evalMeasures() { |
nothing calls this directly
no test coverage detected