Handles one or more incoming data point types for the HTTP endpoint to put raw, rolled up or aggregated data points @param An IncomingDataPoint class. @param tsdb The TSDB to which we belong @param query The query to respond to @param dps The de-serialized data points @throws BadRequestE
(final TSDB tsdb,
final HttpQuery query, final List<T> dps)
| 307 | * @since 2.4 |
| 308 | */ |
| 309 | public <T extends IncomingDataPoint> void processDataPoint(final TSDB tsdb, |
| 310 | final HttpQuery query, final List<T> dps) { |
| 311 | if (dps.size() < 1) { |
| 312 | throw new BadRequestException("No datapoints found in content"); |
| 313 | } |
| 314 | |
| 315 | final HashMap<String, String> query_tags = new HashMap<String, String>(); |
| 316 | final boolean show_details = query.hasQueryStringParam("details"); |
| 317 | final boolean show_summary = query.hasQueryStringParam("summary"); |
| 318 | final boolean synchronous = query.hasQueryStringParam("sync"); |
| 319 | final int sync_timeout = query.hasQueryStringParam("sync_timeout") ? |
| 320 | Integer.parseInt(query.getQueryStringParam("sync_timeout")) : 0; |
| 321 | // this is used to coordinate timeouts |
| 322 | final AtomicBoolean sending_response = new AtomicBoolean(); |
| 323 | sending_response.set(false); |
| 324 | |
| 325 | final List<Map<String, Object>> details = show_details |
| 326 | ? new ArrayList<Map<String, Object>>() : null; |
| 327 | int queued = 0; |
| 328 | final List<Deferred<Boolean>> deferreds = synchronous ? |
| 329 | new ArrayList<Deferred<Boolean>>(dps.size()) : null; |
| 330 | |
| 331 | if (tsdb.getConfig().enable_header_tag()) { |
| 332 | if (LOG.isDebugEnabled()) { |
| 333 | LOG.debug("Looking for tag header " |
| 334 | + tsdb.getConfig().get_name_header_tag()); |
| 335 | } |
| 336 | final String header_tag_value = query.getHeaderValue( |
| 337 | tsdb.getConfig().get_name_header_tag()) ; |
| 338 | if (header_tag_value != null) { |
| 339 | if (LOG.isDebugEnabled()) { |
| 340 | LOG.debug(" header found with value:" + header_tag_value); |
| 341 | } |
| 342 | Tags.parse(query_tags, header_tag_value); |
| 343 | } else if (LOG.isDebugEnabled()) { |
| 344 | LOG.debug(" no such header in request"); |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | for (final IncomingDataPoint dp : dps) { |
| 349 | final DataPointType type; |
| 350 | if (dp instanceof RollUpDataPoint) { |
| 351 | type = DataPointType.ROLLUP; |
| 352 | rollup_dps.incrementAndGet(); |
| 353 | } else if (dp instanceof HistogramPojo) { |
| 354 | type = DataPointType.HISTOGRAM; |
| 355 | raw_histograms.incrementAndGet(); |
| 356 | } else { |
| 357 | type = DataPointType.PUT; |
| 358 | raw_dps.incrementAndGet(); |
| 359 | } |
| 360 | |
| 361 | /* |
| 362 | Error back callback to handle storage failures |
| 363 | */ |
| 364 | final class PutErrback implements Callback<Boolean, Exception> { |
| 365 | public Boolean call(final Exception arg) { |
| 366 | if (arg instanceof PleaseThrottleException) { |
no test coverage detected