(final Boolean allowed)
| 303 | * should be written or not. */ |
| 304 | final class WriteCB implements Callback<Deferred<Object>, Boolean> { |
| 305 | @Override |
| 306 | public Deferred<Object> call(final Boolean allowed) throws Exception { |
| 307 | if (!allowed) { |
| 308 | return Deferred.fromResult(null); |
| 309 | } |
| 310 | |
| 311 | |
| 312 | last_ts = (ms_timestamp ? timestamp : timestamp * 1000); |
| 313 | |
| 314 | long base_time = baseTime(); |
| 315 | long incoming_base_time; |
| 316 | if (ms_timestamp) { |
| 317 | // drop the ms timestamp to seconds to calculate the base timestamp |
| 318 | incoming_base_time = ((timestamp / 1000) - ((timestamp / 1000) % Const.MAX_TIMESPAN)); |
| 319 | } else { |
| 320 | incoming_base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); |
| 321 | } |
| 322 | |
| 323 | if (incoming_base_time - base_time >= Const.MAX_TIMESPAN) { |
| 324 | // Need to start a new row as we've exceeded Const.MAX_TIMESPAN. |
| 325 | base_time = updateBaseTime((ms_timestamp ? timestamp / 1000 : timestamp)); |
| 326 | } |
| 327 | |
| 328 | // Java is so stupid with its auto-promotion of int to float. |
| 329 | final byte[] qualifier = Internal.buildQualifier(timestamp, flags); |
| 330 | |
| 331 | // TODO(tsuna): The following timing is rather useless. First of all, |
| 332 | // the histogram never resets, so it tends to converge to a certain |
| 333 | // distribution and never changes. What we really want is a moving |
| 334 | // histogram so we can see how the latency distribution varies over time. |
| 335 | // The other problem is that the Histogram class isn't thread-safe and |
| 336 | // here we access it from a callback that runs in an unknown thread, so |
| 337 | // we might miss some increments. So let's comment this out until we |
| 338 | // have a proper thread-safe moving histogram. |
| 339 | // final long start_put = System.nanoTime(); |
| 340 | // final Callback<Object, Object> cb = new Callback<Object, Object>() { |
| 341 | // public Object call(final Object arg) { |
| 342 | // putlatency.add((int) ((System.nanoTime() - start_put) / 1000000)); |
| 343 | // return arg; |
| 344 | // } |
| 345 | // public String toString() { |
| 346 | // return "time put request"; |
| 347 | // } |
| 348 | // }; |
| 349 | |
| 350 | // TODO(tsuna): Add an errback to handle some error cases here. |
| 351 | if (tsdb.getConfig().enable_appends()) { |
| 352 | final AppendDataPoints kv = new AppendDataPoints(qualifier, value); |
| 353 | final AppendRequest point = new AppendRequest(tsdb.table, row, TSDB.FAMILY, |
| 354 | AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes()); |
| 355 | point.setDurable(!batch_import); |
| 356 | return tsdb.client.append(point);/* .addBoth(cb) */ |
| 357 | } else { |
| 358 | final PutRequest point = RequestBuilder.buildPutRequest(tsdb.getConfig(), tsdb.table, row, TSDB.FAMILY, |
| 359 | qualifier, value, timestamp); |
| 360 | point.setDurable(!batch_import); |
| 361 | return tsdb.client.put(point)/* .addBoth(cb) */; |
| 362 | } |
nothing calls this directly
no test coverage detected