(memStore memstore.MemStore)
| 323 | } |
| 324 | |
| 325 | func (qc *AQLQueryContext) prepareForGeoIntersect(memStore memstore.MemStore) (shapeExists bool) { |
| 326 | tableScanner := qc.TableScanners[qc.OOPK.geoIntersection.shapeTableID] |
| 327 | shapeColumnID := qc.OOPK.geoIntersection.shapeColumnID |
| 328 | tableName := tableScanner.Schema.Schema.Name |
| 329 | // geo table is not sharded |
| 330 | shard, err := memStore.GetTableShard(tableName, 0) |
| 331 | if err != nil { |
| 332 | qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", tableName, 0) |
| 333 | return |
| 334 | } |
| 335 | defer shard.Users.Done() |
| 336 | |
| 337 | numPointsPerShape := make([]int32, 0, len(qc.OOPK.geoIntersection.shapeUUIDs)) |
| 338 | qc.OOPK.geoIntersection.validShapeUUIDs = make([]string, 0, len(qc.OOPK.geoIntersection.shapeUUIDs)) |
| 339 | var shapesLats, shapesLongs []float32 |
| 340 | var numPoints, totalNumPoints int |
| 341 | for _, uuid := range qc.OOPK.geoIntersection.shapeUUIDs { |
| 342 | recordID, found := shard.LiveStore.LookupKey([]string{uuid}) |
| 343 | if found { |
| 344 | batch := shard.LiveStore.GetBatchForRead(recordID.BatchID) |
| 345 | if batch != nil { |
| 346 | shapeValue := batch.GetDataValue(int(recordID.Index), shapeColumnID) |
| 347 | // compiler should have verified the geo column GeoShape type |
| 348 | shapesLats, shapesLongs, numPoints = getGeoShapeLatLongSlice(shapesLats, shapesLongs, *(shapeValue.GoVal.(*memCom.GeoShapeGo))) |
| 349 | if numPoints > 0 { |
| 350 | totalNumPoints += numPoints |
| 351 | numPointsPerShape = append(numPointsPerShape, int32(numPoints)) |
| 352 | qc.OOPK.geoIntersection.validShapeUUIDs = append(qc.OOPK.geoIntersection.validShapeUUIDs, uuid) |
| 353 | shapeExists = true |
| 354 | } |
| 355 | batch.RUnlock() |
| 356 | } |
| 357 | } |
| 358 | } |
| 359 | |
| 360 | if !shapeExists { |
| 361 | return |
| 362 | } |
| 363 | |
| 364 | numValidShapes := len(numPointsPerShape) |
| 365 | shapeIndexs := make([]uint8, totalNumPoints) |
| 366 | pointIndex := 0 |
| 367 | for shapeIndex, numPoints := range numPointsPerShape { |
| 368 | for i := 0; i < int(numPoints); i++ { |
| 369 | shapeIndexs[pointIndex] = uint8(shapeIndex) |
| 370 | pointIndex++ |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | // allocate memory for lats, longs (float32) and numPoints (int32) device vectors |
| 375 | latsPtrD := deviceAllocate(totalNumPoints*4*2+totalNumPoints, qc.Device) |
| 376 | longsPtrD := latsPtrD.offset(totalNumPoints * 4) |
| 377 | shapeIndexsD := longsPtrD.offset(totalNumPoints * 4) |
| 378 | |
| 379 | cgoutils.AsyncCopyHostToDevice(latsPtrD.getPointer(), unsafe.Pointer(&shapesLats[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device) |
| 380 | cgoutils.AsyncCopyHostToDevice(longsPtrD.getPointer(), unsafe.Pointer(&shapesLongs[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device) |
| 381 | cgoutils.AsyncCopyHostToDevice(shapeIndexsD.getPointer(), unsafe.Pointer(&shapeIndexs[0]), totalNumPoints, qc.cudaStreams[0], qc.Device) |
| 382 |
no test coverage detected