MCPcopy
hub / github.com/OpenTSDB/opentsdb / processRow

Method processRow

src/core/SaltScanner.java:734–866  ·  view source on GitHub ↗

Finds or creates the span for this row, compacts it and stores it. Also fires off a delete request for the row if told to. @param key The row key to use for fetching the span @param row The row to add

(final byte[] key, final ArrayList<KeyValue> row)

Source from the content-addressed store, hash-verified

732 * @param row The row to add
733 */
734 void processRow(final byte[] key, final ArrayList<KeyValue> row) {
735 ++rows_post_filter;
736 if (delete) {
737 final DeleteRequest del = new DeleteRequest(tsdb.dataTable(), key);
738 tsdb.getClient().delete(del);
739 }
740
741 List<HistogramDataPoint> hists = new ArrayList<HistogramDataPoint>();
742
743 //TODO rollup doesn't use the column qualifier prefix right now
744 //Please move this logic to @CompactionQueue.compact API, if the
745 //qualifier prefix is set for rollup. Right now there is no way to
746 //identify whether a cell belong to rollup or default data table
747 //from the KeyValue/Hbase cell object
748 if (RollupQuery.isValidQuery(rollup_query)) {
749 //It is the rollup search result and rollup cells will not be
750 //compacted, so don't need to worry about complex or trivial
751 //compactions. It just need to consider the cells are different key
752 //values
753 for (KeyValue kv:row) {
754 final byte[] qual = kv.qualifier();
755
756 if (qual.length > 0) {
757 // TODO - allow rollups for annotations and histos? Probably will
758 // want to encode those on 4 bytes or something
759 if (!is_rollup && qual[0] == Annotation.PREFIX()) {
760 // This could be a row with only an annotation in it
761 final Annotation note = JSON.parseToObject(kv.value(),
762 Annotation.class);
763 synchronized (annotations) {
764 List<Annotation> map_notes = annotations.get(key);
765 if (map_notes == null) {
766 map_notes = new ArrayList<Annotation>();
767 annotations.put(key, map_notes);
768 }
769 map_notes.add(note);
770 }
771 } else if (!is_rollup && qual[0] == HistogramDataPoint.PREFIX) {
772 try {
773 HistogramDataPoint histogram =
774 Internal.decodeHistogramDataPoint(tsdb, kv);
775 hists.add(histogram);
776 } catch (Throwable t) {
777 LOG.error("Failed to decode histogram data point", t);
778 }
779 } else {
780 if (rollup_query.getRollupAgg() == Aggregators.AVG ||
781 rollup_query.getRollupAgg() == Aggregators.DEV) {
782 if (qual[0] == (byte) rollup_agg_id ||
783 qual[0] == (byte) rollup_count_id ||
784 Bytes.memcmp(RollupQuery.SUM, qual, 0, RollupQuery.SUM.length) == 0 ||
785 Bytes.memcmp(RollupQuery.COUNT, qual, 0, RollupQuery.COUNT.length) == 0) {
786 kvs.add(kv);
787 }
788 } else if (qual[0] == (byte) rollup_agg_id ||
789 Bytes.memcmp(rollup_query.getRollupAggPrefix(),
790 qual, 0, rollup_query.getRollupAggPrefix().length) == 0) {
791 kvs.add(kv);

Callers 2

callMethod · 0.95
callMethod · 0.80

Calls 15

isValidQueryMethod · 0.95
PREFIXMethod · 0.95
parseToObjectMethod · 0.95
inMillisecondsMethod · 0.95
getQualifierLengthMethod · 0.95
nanoTimeMethod · 0.95
closeMethod · 0.95
dataTableMethod · 0.80
putMethod · 0.80
errorMethod · 0.80

Tested by

no test coverage detected