calculateForeignTableMemUsage returns how much device memory is needed for foreign table
(memStore memstore.MemStore)
| 1243 | |
| 1244 | // calculateForeignTableMemUsage returns how much device memory is needed for foreign table |
| 1245 | func (qc *AQLQueryContext) calculateForeignTableMemUsage(memStore memstore.MemStore) int { |
| 1246 | var memUsage int |
| 1247 | |
| 1248 | for joinTableID, join := range qc.Query.Joins { |
| 1249 | // join only support dimension table for now |
| 1250 | // and dimension table is not shared |
| 1251 | shard, err := memStore.GetTableShard(join.Table, 0) |
| 1252 | if err != nil { |
| 1253 | qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", join.Table, 0) |
| 1254 | return 0 |
| 1255 | } |
| 1256 | |
| 1257 | // only need live store for dimension table |
| 1258 | batchIDs, _ := shard.LiveStore.GetBatchIDs() |
| 1259 | |
| 1260 | // primary key |
| 1261 | memUsage += int(shard.LiveStore.PrimaryKey.AllocatedBytes()) |
| 1262 | |
| 1263 | // VPs |
| 1264 | for _, batchID := range batchIDs { |
| 1265 | batch := shard.LiveStore.GetBatchForRead(batchID) |
| 1266 | if batch == nil { |
| 1267 | continue |
| 1268 | } |
| 1269 | |
| 1270 | for _, columnID := range qc.TableScanners[joinTableID+1].Columns { |
| 1271 | usage := qc.TableScanners[joinTableID+1].ColumnUsages[columnID] |
| 1272 | if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { |
| 1273 | sourceVP := batch.Columns[columnID] |
| 1274 | if sourceVP == nil { |
| 1275 | continue |
| 1276 | } |
| 1277 | memUsage += int(sourceVP.GetBytes()) |
| 1278 | } |
| 1279 | } |
| 1280 | batch.RUnlock() |
| 1281 | } |
| 1282 | shard.Users.Done() |
| 1283 | } |
| 1284 | |
| 1285 | return memUsage |
| 1286 | } |
| 1287 | |
| 1288 | // FindDeviceForQuery calls device manager to find a device for the query |
| 1289 | func (qc *AQLQueryContext) FindDeviceForQuery(memStore memstore.MemStore, preferredDevice int, |
no test coverage detected