From ab45c412d0027f286966d5970c0e3510f1c8a6b1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 30 Aug 2021 15:45:32 +0300 Subject: [PATCH] Another try to fix BackgroundPoolTask decrement. --- .../MergeTree/BackgroundJobsExecutor.cpp | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index f3d957117e8..7c784de9ebb 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -84,28 +84,55 @@ bool incrementMetricIfLessThanMax(std::atomic & atomic_value, Int64 max_v } +/// This is a RAII class which only decrements metric. +/// It is added because after all other fixes a bug non-executing merges was occurred again. +/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it. +class ParanoidMetricDecrementor +{ +public: + explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {} + void alarm() { is_alarmed = true; } + void decrement() + { + if (is_alarmed.exchange(false)) + { + CurrentMetrics::values[metric]--; + } + } + + ~ParanoidMetricDecrementor() { decrement(); } + +private: + + CurrentMetrics::Metric metric; + std::atomic_bool is_alarmed = false; +}; + void IBackgroundJobExecutor::execute(JobAndPool job_and_pool) try { auto & pool_config = pools_configs[job_and_pool.pool_type]; const auto max_pool_size = pool_config.get_max_pool_size(); + auto metric_decrementor = std::make_shared(pool_config.tasks_metric); + /// If corresponding pool is not full increment metric and assign new job if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size)) { + metric_decrementor->alarm(); try /// this try required because we have to manually decrement metric { /// Synchronize pool size, because config could be reloaded pools[job_and_pool.pool_type].setMaxThreads(max_pool_size); pools[job_and_pool.pool_type].setQueueSize(max_pool_size); - pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] () + pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, metric_decrementor, job{std::move(job_and_pool.job)}] () { try /// We don't want exceptions in background pool { bool job_success = job(); /// Job done, decrement metric and reset no_work counter - CurrentMetrics::values[pool_config.tasks_metric]--; + metric_decrementor->decrement(); if (job_success) { @@ -121,7 +148,7 @@ try } catch (...) { - CurrentMetrics::values[pool_config.tasks_metric]--; + metric_decrementor->decrement(); tryLogCurrentException(__PRETTY_FUNCTION__); scheduleTask(/* with_backoff = */ true); } @@ -133,7 +160,7 @@ try catch (...) { /// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here - CurrentMetrics::values[pool_config.tasks_metric]--; + metric_decrementor->decrement(); tryLogCurrentException(__PRETTY_FUNCTION__); scheduleTask(/* with_backoff = */ true); }