Execute the RPC and serialize the response @param query The HTTP query to parse and and return results to
(final HttpQuery query)
| 222 | * @param query The HTTP query to parse and and return results to |
| 223 | */ |
| 224 | public void execute(final HttpQuery query) { |
| 225 | http_query = query; |
| 226 | final QueryStats query_stats = |
| 227 | new QueryStats(query.getRemoteAddress(), ts_query, query.getHeaders()); |
| 228 | ts_query.setQueryStats(query_stats); |
| 229 | |
| 230 | /** |
| 231 | * Sends the serialized results to the caller. This should be the very |
| 232 | * last callback executed. |
| 233 | */ |
| 234 | class CompleteCB implements Callback<Object, ChannelBuffer> { |
| 235 | @Override |
| 236 | public Object call(final ChannelBuffer cb) throws Exception { |
| 237 | query.sendReply(cb); |
| 238 | return null; |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | /** |
| 243 | * After all of the queries have run and we have data (or not) then we |
| 244 | * need to compile the iterators. |
| 245 | * This class could probably be improved: |
| 246 | * First we iterate over the results AND for each result, iterate over |
| 247 | * the expressions, giving a time synced iterator to each expression that |
| 248 | * needs the result set. |
| 249 | * THEN we iterate over the expressions again and build a DAG to determine |
| 250 | * if any of the expressions require the output of an expression. If so |
| 251 | * then we add the expressions to the proper parent and compile them in |
| 252 | * order. |
| 253 | * After all of that we're ready to start serializing and iterating |
| 254 | * over the results. |
| 255 | */ |
| 256 | class QueriesCB implements Callback<Object, ArrayList<DataPoints[]>> { |
| 257 | public Object call(final ArrayList<DataPoints[]> query_results) |
| 258 | throws Exception { |
| 259 | |
| 260 | for (int i = 0; i < query_results.size(); i++) { |
| 261 | final TSSubQuery sub = ts_query.getQueries().get(i); |
| 262 | |
| 263 | Iterator<Entry<String, TSSubQuery>> it = sub_queries.entrySet().iterator(); |
| 264 | while (it.hasNext()) { |
| 265 | final Entry<String, TSSubQuery> entry = it.next(); |
| 266 | if (entry.getValue().equals(sub)) { |
| 267 | sub_query_results.put(entry.getKey(), query_results.get(i)); |
| 268 | if (expressions != null) { |
| 269 | for (final ExpressionIterator ei : expressions.values()) { |
| 270 | if (ei.getVariableNames().contains(entry.getKey())) { |
| 271 | final TimeSyncedIterator tsi = new TimeSyncedIterator( |
| 272 | entry.getKey(), sub.getFilterTagKs(), |
| 273 | query_results.get(i)); |
| 274 | final NumericFillPolicy fill = fills.get(entry.getKey()); |
| 275 | if (fill != null) { |
| 276 | tsi.setFillPolicy(fill); |
| 277 | } |
| 278 | ei.addResults(entry.getKey(), tsi); |
| 279 | if (LOG.isDebugEnabled()) { |
| 280 | LOG.debug("Added results for " + entry.getKey() + |
| 281 | " to " + ei.getId()); |
no test coverage detected