MCPcopy
hub / github.com/uber/aresdb / join

Method join

query/aql_batchexecutor.go:116–166  ·  view source on GitHub ↗

join

()

Source from the content-addressed store, hash-verified

114
115// join
116func (e *BatchExecutorImpl) join() {
117 e.qc.doProfile(func() {
118 // join foreign tables
119 for joinTableID, foreignTable := range e.qc.OOPK.foreignTables {
120 if foreignTable != nil {
121 // prepare foreign table recordIDs
122 // Note:
123 // RecordID {
124 // int32_t batchID
125 // uint32_t index
126 // }
127 // takes up 8 bytes
128 e.qc.OOPK.currentBatch.foreignTableRecordIDsD = append(e.qc.OOPK.currentBatch.foreignTableRecordIDsD, deviceAllocate(8*e.qc.OOPK.currentBatch.size, e.qc.Device))
129 mainTableJoinColumnIndex := e.qc.TableScanners[0].ColumnsByIDs[foreignTable.remoteJoinColumn.ColumnID]
130 // perform hash lookup
131 e.qc.OOPK.currentBatch.prepareForeignRecordIDs(mainTableJoinColumnIndex, joinTableID, *foreignTable, e.stream, e.qc.Device)
132 }
133 }
134 e.qc.reportTimingForCurrentBatch(e.stream, &e.start, prepareForeignRecordIDsTiming)
135 }, "joins", e.stream)
136
137 e.qc.doProfile(func() {
138 // process filters that involves foreign table columns if any
139 for _, filter := range e.qc.OOPK.ForeignTableCommonFilters {
140 e.qc.OOPK.currentBatch.processExpression(filter, nil,
141 e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, e.qc.OOPK.currentBatch.filterAction)
142 }
143 e.qc.reportTimingForCurrentBatch(e.stream, &e.start, foreignTableFilterEvalTiming)
144 }, "filters", e.stream)
145
146 if e.qc.OOPK.geoIntersection != nil {
147 // allocate two predicate vector for geo intersect
148 numWords := (e.qc.OOPK.geoIntersection.numShapes + 31) / 32
149 e.qc.OOPK.currentBatch.geoPredicateVectorD = deviceAllocate(e.qc.OOPK.currentBatch.size*4*numWords, e.qc.Device)
150 }
151
152 e.sizeBeforeGeoFilter = e.qc.OOPK.currentBatch.size
153 e.qc.doProfile(func() {
154 if e.qc.OOPK.geoIntersection != nil {
155 pointColumnIndex := e.qc.TableScanners[e.qc.OOPK.geoIntersection.pointTableID].
156 ColumnsByIDs[e.qc.OOPK.geoIntersection.pointColumnID]
157 e.qc.OOPK.currentBatch.geoIntersect(
158 e.qc.OOPK.geoIntersection,
159 pointColumnIndex,
160 e.qc.OOPK.foreignTables,
161 e.qc.OOPK.currentBatch.geoPredicateVectorD,
162 e.stream, e.qc.Device)
163 }
164 e.qc.reportTimingForCurrentBatch(e.stream, &e.start, geoIntersectEvalTiming)
165 }, "geo_intersect", e.stream)
166}
167
168// evalMeasures is to fill measure values
169func (e *BatchExecutorImpl) evalMeasures() {

Callers

nothing calls this directly

Calls 6

deviceAllocateFunction · 0.85
doProfileMethod · 0.80
processExpressionMethod · 0.80
geoIntersectMethod · 0.80

Tested by

no test coverage detected