MCPcopy
hub / github.com/uber/aresdb / prepareForGeoIntersect

Method prepareForGeoIntersect

query/aql_processor.go:325–387  ·  view source on GitHub ↗
(memStore memstore.MemStore)

Source from the content-addressed store, hash-verified

323}
324
325func (qc *AQLQueryContext) prepareForGeoIntersect(memStore memstore.MemStore) (shapeExists bool) {
326 tableScanner := qc.TableScanners[qc.OOPK.geoIntersection.shapeTableID]
327 shapeColumnID := qc.OOPK.geoIntersection.shapeColumnID
328 tableName := tableScanner.Schema.Schema.Name
329 // geo table is not sharded
330 shard, err := memStore.GetTableShard(tableName, 0)
331 if err != nil {
332 qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", tableName, 0)
333 return
334 }
335 defer shard.Users.Done()
336
337 numPointsPerShape := make([]int32, 0, len(qc.OOPK.geoIntersection.shapeUUIDs))
338 qc.OOPK.geoIntersection.validShapeUUIDs = make([]string, 0, len(qc.OOPK.geoIntersection.shapeUUIDs))
339 var shapesLats, shapesLongs []float32
340 var numPoints, totalNumPoints int
341 for _, uuid := range qc.OOPK.geoIntersection.shapeUUIDs {
342 recordID, found := shard.LiveStore.LookupKey([]string{uuid})
343 if found {
344 batch := shard.LiveStore.GetBatchForRead(recordID.BatchID)
345 if batch != nil {
346 shapeValue := batch.GetDataValue(int(recordID.Index), shapeColumnID)
347 // compiler should have verified the geo column GeoShape type
348 shapesLats, shapesLongs, numPoints = getGeoShapeLatLongSlice(shapesLats, shapesLongs, *(shapeValue.GoVal.(*memCom.GeoShapeGo)))
349 if numPoints > 0 {
350 totalNumPoints += numPoints
351 numPointsPerShape = append(numPointsPerShape, int32(numPoints))
352 qc.OOPK.geoIntersection.validShapeUUIDs = append(qc.OOPK.geoIntersection.validShapeUUIDs, uuid)
353 shapeExists = true
354 }
355 batch.RUnlock()
356 }
357 }
358 }
359
360 if !shapeExists {
361 return
362 }
363
364 numValidShapes := len(numPointsPerShape)
365 shapeIndexs := make([]uint8, totalNumPoints)
366 pointIndex := 0
367 for shapeIndex, numPoints := range numPointsPerShape {
368 for i := 0; i < int(numPoints); i++ {
369 shapeIndexs[pointIndex] = uint8(shapeIndex)
370 pointIndex++
371 }
372 }
373
374 // allocate memory for lats, longs (float32) and numPoints (int32) device vectors
375 latsPtrD := deviceAllocate(totalNumPoints*4*2+totalNumPoints, qc.Device)
376 longsPtrD := latsPtrD.offset(totalNumPoints * 4)
377 shapeIndexsD := longsPtrD.offset(totalNumPoints * 4)
378
379 cgoutils.AsyncCopyHostToDevice(latsPtrD.getPointer(), unsafe.Pointer(&shapesLats[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device)
380 cgoutils.AsyncCopyHostToDevice(longsPtrD.getPointer(), unsafe.Pointer(&shapesLongs[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device)
381 cgoutils.AsyncCopyHostToDevice(shapeIndexsD.getPointer(), unsafe.Pointer(&shapeIndexs[0]), totalNumPoints, qc.cudaStreams[0], qc.Device)
382

Callers 1

ProcessQueryMethod · 0.95

Calls 12

StackErrorFunction · 0.92
AsyncCopyHostToDeviceFunction · 0.92
getGeoShapeLatLongSliceFunction · 0.85
deviceAllocateFunction · 0.85
LookupKeyMethod · 0.80
offsetMethod · 0.80
getPointerMethod · 0.80
GetTableShardMethod · 0.65
DoneMethod · 0.65
GetDataValueMethod · 0.65
RUnlockMethod · 0.65
GetBatchForReadMethod · 0.45

Tested by

no test coverage detected