(ArrayList<ArrayList<KeyValue>> rows)
| 370 | } |
| 371 | |
| 372 | @Override |
| 373 | public Object call(ArrayList<ArrayList<KeyValue>> rows) |
| 374 | throws Exception { |
| 375 | if (rows == null) { |
| 376 | result.callback(null); |
| 377 | return null; |
| 378 | } |
| 379 | |
| 380 | for (final ArrayList<KeyValue> row : rows) { |
| 381 | try { |
| 382 | final byte[] tsuid = UniqueId.getTSUIDFromKey(row.get(0).key(), |
| 383 | TSDB.metrics_width(), Const.TIMESTAMP_BYTES); |
| 384 | |
| 385 | // if the current tsuid is the same as the last, just continue |
| 386 | // so we save time |
| 387 | if (last_tsuid != null && Arrays.equals(last_tsuid, tsuid)) { |
| 388 | continue; |
| 389 | } |
| 390 | last_tsuid = tsuid; |
| 391 | |
| 392 | // see if we've already processed this tsuid and if so, continue |
| 393 | if (processed_tsuids.contains(Arrays.hashCode(tsuid))) { |
| 394 | continue; |
| 395 | } |
| 396 | tsuid_string = UniqueId.uidToString(tsuid); |
| 397 | |
| 398 | /** |
| 399 | * An error callback used to catch issues with a particular timeseries |
| 400 | * or UIDMeta such as a missing UID name. We want to continue |
| 401 | * processing when this happens so we'll just log the error and |
| 402 | * the user can issue a command later to clean up orphaned meta |
| 403 | * entries. |
| 404 | */ |
| 405 | final class RowErrBack implements Callback<Object, Exception> { |
| 406 | @Override |
| 407 | public Object call(Exception e) throws Exception { |
| 408 | Throwable ex = e; |
| 409 | while (ex.getClass().equals(DeferredGroupException.class)) { |
| 410 | if (ex.getCause() == null) { |
| 411 | LOG.warn("Unable to get to the root cause of the DGE"); |
| 412 | break; |
| 413 | } |
| 414 | ex = ex.getCause(); |
| 415 | } |
| 416 | if (ex.getClass().equals(IllegalStateException.class)) { |
| 417 | LOG.error("Invalid data when processing TSUID [" + |
| 418 | tsuid_string + "]: " + ex.getMessage()); |
| 419 | } else if (ex.getClass().equals(IllegalArgumentException.class)) { |
| 420 | LOG.error("Invalid data when processing TSUID [" + |
| 421 | tsuid_string + "]: " + ex.getMessage()); |
| 422 | } else if (ex.getClass().equals(NoSuchUniqueId.class)) { |
| 423 | LOG.warn("Timeseries [" + tsuid_string + |
| 424 | "] includes a non-existant UID: " + ex.getMessage()); |
| 425 | } else { |
| 426 | LOG.error("Unknown exception processing row: " + row, ex); |
| 427 | } |
| 428 | return null; |
| 429 | } |
nothing calls this directly
no test coverage detected