Perform the compaction. @return A Deferred if the compaction processed required a write to HBase, otherwise null.
()
| 338 | * to HBase, otherwise {@code null}. |
| 339 | */ |
| 340 | public Deferred<Object> compact() { |
| 341 | // no columns in row, nothing to do |
| 342 | if (nkvs == 0) { |
| 343 | return null; |
| 344 | } |
| 345 | |
| 346 | compactedKVTimestamp = Long.MIN_VALUE; |
| 347 | // go through all the columns, process annotations, and |
| 348 | heap = new PriorityQueue<ColumnDatapointIterator>(nkvs); |
| 349 | int tot_values = buildHeapProcessAnnotations(); |
| 350 | |
| 351 | // if there are no datapoints or only one that needs no fixup, we are done |
| 352 | if (noMergesOrFixups()) { |
| 353 | // return the single non-annotation entry if requested |
| 354 | if (compacted != null && heap.size() == 1) { |
| 355 | compacted[0] = findFirstDatapointColumn(); |
| 356 | } |
| 357 | return null; |
| 358 | } |
| 359 | |
| 360 | // merge the datapoints, ordered by timestamp and removing duplicates |
| 361 | final ByteBufferList compacted_qual = new ByteBufferList(tot_values); |
| 362 | final ByteBufferList compacted_val = new ByteBufferList(tot_values); |
| 363 | compaction_count.incrementAndGet(); |
| 364 | mergeDatapoints(compacted_qual, compacted_val); |
| 365 | |
| 366 | // if we wound up with no data in the compacted column, we are done |
| 367 | if (compacted_qual.segmentCount() == 0) { |
| 368 | return null; |
| 369 | } |
| 370 | |
| 371 | // build the compacted columns |
| 372 | final KeyValue compact = buildCompactedColumn(compacted_qual, compacted_val); |
| 373 | |
| 374 | final boolean write = updateDeletesCheckForWrite(compact); |
| 375 | |
| 376 | if (compacted != null) { // Caller is interested in the compacted form. |
| 377 | compacted[0] = compact; |
| 378 | final long base_time = Bytes.getUnsignedInt(compact.key(), |
| 379 | Const.SALT_WIDTH() + metric_width); |
| 380 | final long cut_off = System.currentTimeMillis() / 1000 |
| 381 | - Const.MAX_TIMESPAN - 1; |
| 382 | if (base_time > cut_off) { // If row is too recent... |
| 383 | return null; // ... Don't write back compacted. |
| 384 | } |
| 385 | } |
| 386 | // if compactions aren't enabled or there is nothing to write, we're done |
| 387 | if (!tsdb.config.enable_compactions() || (!write && to_delete.isEmpty())) { |
| 388 | return null; |
| 389 | } |
| 390 | |
| 391 | final byte[] key = compact.key(); |
| 392 | //LOG.debug("Compacting row " + Arrays.toString(key)); |
| 393 | deleted_cells.addAndGet(to_delete.size()); // We're going to delete this. |
| 394 | if (write) { |
| 395 | written_cells.incrementAndGet(); |
| 396 | Deferred<Object> deferred = tsdb.put(key, compact.qualifier(), compact.value(), compactedKVTimestamp); |
| 397 | if (!to_delete.isEmpty()) { |
no test coverage detected