Finds or creates the span for this row, compacts it and stores it. Also fires off a delete request for the row if told to. @param key The row key to use for fetching the span @param row The row to add
(final byte[] key, final ArrayList<KeyValue> row)
| 732 | * @param row The row to add |
| 733 | */ |
| 734 | void processRow(final byte[] key, final ArrayList<KeyValue> row) { |
| 735 | ++rows_post_filter; |
| 736 | if (delete) { |
| 737 | final DeleteRequest del = new DeleteRequest(tsdb.dataTable(), key); |
| 738 | tsdb.getClient().delete(del); |
| 739 | } |
| 740 | |
| 741 | List<HistogramDataPoint> hists = new ArrayList<HistogramDataPoint>(); |
| 742 | |
| 743 | //TODO rollup doesn't use the column qualifier prefix right now |
| 744 | //Please move this logic to @CompactionQueue.compact API, if the |
| 745 | //qualifier prefix is set for rollup. Right now there is no way to |
| 746 | //identify whether a cell belong to rollup or default data table |
| 747 | //from the KeyValue/Hbase cell object |
| 748 | if (RollupQuery.isValidQuery(rollup_query)) { |
| 749 | //It is the rollup search result and rollup cells will not be |
| 750 | //compacted, so don't need to worry about complex or trivial |
| 751 | //compactions. It just need to consider the cells are different key |
| 752 | //values |
| 753 | for (KeyValue kv:row) { |
| 754 | final byte[] qual = kv.qualifier(); |
| 755 | |
| 756 | if (qual.length > 0) { |
| 757 | // TODO - allow rollups for annotations and histos? Probably will |
| 758 | // want to encode those on 4 bytes or something |
| 759 | if (!is_rollup && qual[0] == Annotation.PREFIX()) { |
| 760 | // This could be a row with only an annotation in it |
| 761 | final Annotation note = JSON.parseToObject(kv.value(), |
| 762 | Annotation.class); |
| 763 | synchronized (annotations) { |
| 764 | List<Annotation> map_notes = annotations.get(key); |
| 765 | if (map_notes == null) { |
| 766 | map_notes = new ArrayList<Annotation>(); |
| 767 | annotations.put(key, map_notes); |
| 768 | } |
| 769 | map_notes.add(note); |
| 770 | } |
| 771 | } else if (!is_rollup && qual[0] == HistogramDataPoint.PREFIX) { |
| 772 | try { |
| 773 | HistogramDataPoint histogram = |
| 774 | Internal.decodeHistogramDataPoint(tsdb, kv); |
| 775 | hists.add(histogram); |
| 776 | } catch (Throwable t) { |
| 777 | LOG.error("Failed to decode histogram data point", t); |
| 778 | } |
| 779 | } else { |
| 780 | if (rollup_query.getRollupAgg() == Aggregators.AVG || |
| 781 | rollup_query.getRollupAgg() == Aggregators.DEV) { |
| 782 | if (qual[0] == (byte) rollup_agg_id || |
| 783 | qual[0] == (byte) rollup_count_id || |
| 784 | Bytes.memcmp(RollupQuery.SUM, qual, 0, RollupQuery.SUM.length) == 0 || |
| 785 | Bytes.memcmp(RollupQuery.COUNT, qual, 0, RollupQuery.COUNT.length) == 0) { |
| 786 | kvs.add(kv); |
| 787 | } |
| 788 | } else if (qual[0] == (byte) rollup_agg_id || |
| 789 | Bytes.memcmp(rollup_query.getRollupAggPrefix(), |
| 790 | qual, 0, rollup_query.getRollupAggPrefix().length) == 0) { |
| 791 | kvs.add(kv); |
no test coverage detected