(Log log)
| 293 | } |
| 294 | |
| 295 | private synchronized void initialize(Log log) { |
| 296 | initialized = true; |
| 297 | final ServerConfiguration config = bootstrap.getConfiguration(); |
| 298 | final CairoConfiguration cairoConfig = config.getCairoConfiguration(); |
| 299 | // create the worker pool manager, and configure the shared pool |
| 300 | final boolean walSupported = cairoConfig.isWalSupported(); |
| 301 | final boolean isReadOnly = cairoConfig.isReadOnlyInstance(); |
| 302 | final boolean walApplyEnabled = cairoConfig.isWalApplyEnabled(); |
| 303 | |
| 304 | workerPoolManager = new WorkerPoolManager(config) { |
| 305 | @Override |
| 306 | protected void configureWorkerPools(final WorkerPool sharedPoolQuery, final WorkerPool sharedPoolWrite) { |
| 307 | try { |
| 308 | Job engineMaintenanceJob = setupEngineMaintenanceJob(engine); |
| 309 | if (engineMaintenanceJob != null) { |
| 310 | sharedPoolWrite.assign(engineMaintenanceJob); |
| 311 | } |
| 312 | final MemoryConfiguration memoryConfig = config.getMemoryConfiguration(); |
| 313 | sharedPoolWrite.assign(new MemoryUsageLogJob( |
| 314 | cairoConfig.getMicrosecondClock(), |
| 315 | memoryConfig::isMemoryUsageLogEnabled, |
| 316 | memoryConfig::getMemoryUsageLogInterval |
| 317 | )); |
| 318 | WorkerPoolUtils.setupAsyncMunmapJob(sharedPoolQuery, engine); |
| 319 | WorkerPoolUtils.setupQueryJobs(sharedPoolQuery, engine); |
| 320 | |
| 321 | if (!config.getCairoConfiguration().isReadOnlyInstance()) { |
| 322 | QueryTracingJob queryTracingJob = new QueryTracingJob(engine); |
| 323 | sharedPoolQuery.assign(queryTracingJob); |
| 324 | freeOnExit(queryTracingJob); |
| 325 | } |
| 326 | |
| 327 | if (!isReadOnly) { |
| 328 | WorkerPoolUtils.setupWriterJobs(sharedPoolWrite, engine); |
| 329 | |
| 330 | if (walSupported) { |
| 331 | sharedPoolWrite.assign(config.getFactoryProvider().getWalJobFactory().createCheckWalTransactionsJob(engine)); |
| 332 | final WalPurgeJob walPurgeJob = config.getFactoryProvider().getWalJobFactory().createWalPurgeJob(engine); |
| 333 | walPurgeJob.delayByHalfInterval(); |
| 334 | sharedPoolWrite.assign(walPurgeJob); |
| 335 | sharedPoolWrite.freeOnExit(walPurgeJob); |
| 336 | |
| 337 | // wal apply job in the shared pool when there is no dedicated pool |
| 338 | if (walApplyEnabled && !config.getWalApplyPoolConfiguration().isEnabled()) { |
| 339 | setupWalApplyJob(sharedPoolWrite, engine, sharedPoolQuery.getWorkerCount()); |
| 340 | } |
| 341 | } |
| 342 | |
| 343 | // text import |
| 344 | if (!Chars.empty(cairoConfig.getSqlCopyInputRoot())) { |
| 345 | CopyImportJob.assignToPool(engine.getMessageBus(), sharedPoolWrite); |
| 346 | final CopyImportRequestJob copyImportRequestJob = new CopyImportRequestJob( |
| 347 | engine, |
| 348 | // save CPU resources for collecting and processing jobs |
| 349 | Math.max(1, sharedPoolWrite.getWorkerCount() - 2) |
| 350 | ); |
| 351 | sharedPoolWrite.assign(copyImportRequestJob); |
| 352 | sharedPoolWrite.freeOnExit(copyImportRequestJob); |
no test coverage detected