(memStore memstore.MemStore, shardOwner topology.ShardOwner, deviceManager *query.DeviceManager, aqlRequest apiCom.AQLRequest, aqlQuery queryCom.AQLQuery)
| 237 | } |
| 238 | |
| 239 | func handleQuery(memStore memstore.MemStore, shardOwner topology.ShardOwner, deviceManager *query.DeviceManager, aqlRequest apiCom.AQLRequest, aqlQuery queryCom.AQLQuery) (qc *query.AQLQueryContext, statusCode int) { |
| 240 | qc = &query.AQLQueryContext{ |
| 241 | Query: &aqlQuery, |
| 242 | ReturnHLLData: aqlRequest.Accept == utils.HTTPContentTypeHyperLogLog, |
| 243 | DataOnly: aqlRequest.DataOnly != 0, |
| 244 | } |
| 245 | qc.Compile(memStore, shardOwner) |
| 246 | |
| 247 | for tableName := range qc.TableSchemaByName { |
| 248 | utils.GetRootReporter().GetChildCounter(map[string]string{ |
| 249 | "table": tableName, |
| 250 | }, utils.QueryReceived).Inc(1) |
| 251 | } |
| 252 | |
| 253 | if aqlRequest.Debug > 0 || aqlRequest.Profiling != "" { |
| 254 | qc.Debug = true |
| 255 | aqlRequest.Verbose = 1 |
| 256 | |
| 257 | } |
| 258 | qc.Profiling = aqlRequest.Profiling |
| 259 | |
| 260 | // Compilation error, should be bad request |
| 261 | if qc.Error != nil { |
| 262 | statusCode = http.StatusBadRequest |
| 263 | return |
| 264 | } |
| 265 | |
| 266 | // Find a device that meets the resource requirement of this query |
| 267 | // Use query specified device as hint |
| 268 | qc.FindDeviceForQuery(memStore, aqlRequest.Device, deviceManager, aqlRequest.DeviceChoosingTimeout) |
| 269 | // Unable to find a device for the query. |
| 270 | if qc.Error != nil { |
| 271 | // Unable to fulfill this request due to resource not available, clients need to try sometimes later. |
| 272 | statusCode = http.StatusServiceUnavailable |
| 273 | return |
| 274 | } |
| 275 | defer deviceManager.ReleaseReservedMemory(qc.Device, qc.Query) |
| 276 | // Execute. |
| 277 | qc.ProcessQuery(memStore) |
| 278 | if qc.Error != nil { |
| 279 | utils.GetQueryLogger().With( |
| 280 | "error", qc.Error, |
| 281 | "query", aqlQuery, |
| 282 | "context", qc, |
| 283 | ).Error("Error happened when processing query") |
| 284 | statusCode = http.StatusInternalServerError |
| 285 | } else { |
| 286 | // Report |
| 287 | utils.GetRootReporter().GetChildCounter(map[string]string{ |
| 288 | "table": aqlQuery.Table, |
| 289 | }, utils.QueryRowsReturned).Inc(int64(qc.ResultsRowsFlushed())) |
| 290 | } |
| 291 | return |
| 292 | } |
| 293 | |
| 294 | func getReponseWriter(returnHLL bool, nQueries int) QueryResponseWriter { |
| 295 | if returnHLL { |
no test coverage detected