Merge pull request #28353 from ClickHouse/another-try-to-fix-BackgroundPoolTask-decrement

Another try to fix BackgroundPoolTask decrement.
This commit is contained in:
Nikolai Kochetov 2021-08-31 11:34:26 +03:00 committed by GitHub
commit 8143c2e2b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -84,28 +84,55 @@ bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_v
} }
/// This is a RAII class which only decrements metric.
/// It is added because after all other fixes a bug non-executing merges was occurred again.
/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it.
class ParanoidMetricDecrementor
{
public:
explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {}
void alarm() { is_alarmed = true; }
void decrement()
{
if (is_alarmed.exchange(false))
{
CurrentMetrics::values[metric]--;
}
}
~ParanoidMetricDecrementor() { decrement(); }
private:
CurrentMetrics::Metric metric;
std::atomic_bool is_alarmed = false;
};
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool) void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
try try
{ {
auto & pool_config = pools_configs[job_and_pool.pool_type]; auto & pool_config = pools_configs[job_and_pool.pool_type];
const auto max_pool_size = pool_config.get_max_pool_size(); const auto max_pool_size = pool_config.get_max_pool_size();
auto metric_decrementor = std::make_shared<ParanoidMetricDecrementor>(pool_config.tasks_metric);
/// If corresponding pool is not full increment metric and assign new job /// If corresponding pool is not full increment metric and assign new job
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size)) if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
{ {
metric_decrementor->alarm();
try /// this try required because we have to manually decrement metric try /// this try required because we have to manually decrement metric
{ {
/// Synchronize pool size, because config could be reloaded /// Synchronize pool size, because config could be reloaded
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size); pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
pools[job_and_pool.pool_type].setQueueSize(max_pool_size); pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] () pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, metric_decrementor, job{std::move(job_and_pool.job)}] ()
{ {
try /// We don't want exceptions in background pool try /// We don't want exceptions in background pool
{ {
bool job_success = job(); bool job_success = job();
/// Job done, decrement metric and reset no_work counter /// Job done, decrement metric and reset no_work counter
CurrentMetrics::values[pool_config.tasks_metric]--; metric_decrementor->decrement();
if (job_success) if (job_success)
{ {
@ -121,7 +148,7 @@ try
} }
catch (...) catch (...)
{ {
CurrentMetrics::values[pool_config.tasks_metric]--; metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true); scheduleTask(/* with_backoff = */ true);
} }
@ -133,7 +160,7 @@ try
catch (...) catch (...)
{ {
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here /// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
CurrentMetrics::values[pool_config.tasks_metric]--; metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true); scheduleTask(/* with_backoff = */ true);
} }