Implements #addPoint by storing a value with a specific flag. @param timestamp The timestamp to associate with the value. @param value The value to store. @param flags Flags to store in the qualifier (size and type of the data point). @return A deferred object tha
(final long timestamp,
final byte[] value, final short flags)
| 271 | * @return A deferred object that indicates the completion of the request. |
| 272 | */ |
| 273 | private Deferred<Object> addPointInternal(final long timestamp, |
| 274 | final byte[] value, final short flags) { |
| 275 | if (row == null) { |
| 276 | throw new IllegalStateException("setSeries() never called!"); |
| 277 | } |
| 278 | final boolean ms_timestamp = (timestamp & Const.SECOND_MASK) != 0; |
| 279 | |
| 280 | // we only accept unix epoch timestamps in seconds or milliseconds |
| 281 | if (timestamp < 0 || (ms_timestamp && timestamp > 9999999999999L)) { |
| 282 | throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") |
| 283 | + " timestamp=" + timestamp + " when trying to add value=" |
| 284 | + Arrays.toString(value) + " to " + this); |
| 285 | } |
| 286 | |
| 287 | // always maintain last_ts in milliseconds |
| 288 | if ((ms_timestamp ? timestamp : timestamp * 1000) <= last_ts) { |
| 289 | if (allow_out_of_order_data) { |
| 290 | // as we don't want to perform any funky calculations to find out if |
| 291 | // we're still in the same time range, just pass it off to the regular |
| 292 | // TSDB add function. |
| 293 | return tsdb.addPointInternal(metric, timestamp, value, tags, flags); |
| 294 | } else { |
| 295 | throw new IllegalArgumentException("New timestamp=" + timestamp |
| 296 | + " is less than or equal to previous=" + last_ts |
| 297 | + " when trying to add value=" + Arrays.toString(value) + " to " |
| 298 | + this); |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | /** Callback executed for chaining filter calls to see if the value |
| 303 | * should be written or not. */ |
| 304 | final class WriteCB implements Callback<Deferred<Object>, Boolean> { |
| 305 | @Override |
| 306 | public Deferred<Object> call(final Boolean allowed) throws Exception { |
| 307 | if (!allowed) { |
| 308 | return Deferred.fromResult(null); |
| 309 | } |
| 310 | |
| 311 | |
| 312 | last_ts = (ms_timestamp ? timestamp : timestamp * 1000); |
| 313 | |
| 314 | long base_time = baseTime(); |
| 315 | long incoming_base_time; |
| 316 | if (ms_timestamp) { |
| 317 | // drop the ms timestamp to seconds to calculate the base timestamp |
| 318 | incoming_base_time = ((timestamp / 1000) - ((timestamp / 1000) % Const.MAX_TIMESPAN)); |
| 319 | } else { |
| 320 | incoming_base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); |
| 321 | } |
| 322 | |
| 323 | if (incoming_base_time - base_time >= Const.MAX_TIMESPAN) { |
| 324 | // Need to start a new row as we've exceeded Const.MAX_TIMESPAN. |
| 325 | base_time = updateBaseTime((ms_timestamp ? timestamp / 1000 : timestamp)); |
| 326 | } |
| 327 | |
| 328 | // Java is so stupid with its auto-promotion of int to float. |
| 329 | final byte[] qualifier = Internal.buildQualifier(timestamp, flags); |
| 330 |
no test coverage detected