MCPcopy Index your code
hub / github.com/OpenTSDB/opentsdb / execute

Method execute

src/tsd/QueryExecutor.java:224–475  ·  view source on GitHub ↗

Execute the RPC and serialize the response @param query The HTTP query to parse and and return results to

(final HttpQuery query)

Source from the content-addressed store, hash-verified

222 * @param query The HTTP query to parse and and return results to
223 */
224 public void execute(final HttpQuery query) {
225 http_query = query;
226 final QueryStats query_stats =
227 new QueryStats(query.getRemoteAddress(), ts_query, query.getHeaders());
228 ts_query.setQueryStats(query_stats);
229
230 /**
231 * Sends the serialized results to the caller. This should be the very
232 * last callback executed.
233 */
234 class CompleteCB implements Callback<Object, ChannelBuffer> {
235 @Override
236 public Object call(final ChannelBuffer cb) throws Exception {
237 query.sendReply(cb);
238 return null;
239 }
240 }
241
242 /**
243 * After all of the queries have run and we have data (or not) then we
244 * need to compile the iterators.
245 * This class could probably be improved:
246 * First we iterate over the results AND for each result, iterate over
247 * the expressions, giving a time synced iterator to each expression that
248 * needs the result set.
249 * THEN we iterate over the expressions again and build a DAG to determine
250 * if any of the expressions require the output of an expression. If so
251 * then we add the expressions to the proper parent and compile them in
252 * order.
253 * After all of that we're ready to start serializing and iterating
254 * over the results.
255 */
256 class QueriesCB implements Callback<Object, ArrayList<DataPoints[]>> {
257 public Object call(final ArrayList<DataPoints[]> query_results)
258 throws Exception {
259
260 for (int i = 0; i < query_results.size(); i++) {
261 final TSSubQuery sub = ts_query.getQueries().get(i);
262
263 Iterator<Entry<String, TSSubQuery>> it = sub_queries.entrySet().iterator();
264 while (it.hasNext()) {
265 final Entry<String, TSSubQuery> entry = it.next();
266 if (entry.getValue().equals(sub)) {
267 sub_query_results.put(entry.getKey(), query_results.get(i));
268 if (expressions != null) {
269 for (final ExpressionIterator ei : expressions.values()) {
270 if (ei.getVariableNames().contains(entry.getKey())) {
271 final TimeSyncedIterator tsi = new TimeSyncedIterator(
272 entry.getKey(), sub.getFilterTagKs(),
273 query_results.get(i));
274 final NumericFillPolicy fill = fills.get(entry.getKey());
275 if (fill != null) {
276 tsi.setFillPolicy(fill);
277 }
278 ei.addResults(entry.getKey(), tsi);
279 if (LOG.isDebugEnabled()) {
280 LOG.debug("Added results for " + entry.getKey() +
281 " to " + ei.getId());

Callers 1

handleExpressionQueryMethod · 0.95

Calls 4

getRemoteAddressMethod · 0.80
getHeadersMethod · 0.80
setQueryStatsMethod · 0.80
buildQueriesAsyncMethod · 0.80

Tested by

no test coverage detected