Another try to fix BackgroundPoolTask decrement.

This commit is contained in:
Nikolai Kochetov 2021-08-30 15:45:32 +03:00
parent fb66ab75be
commit ab45c412d0

View File

@ -84,28 +84,55 @@ bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_v
}
/// This is a RAII class which only decrements metric.
/// It is added because after all other fixes a bug non-executing merges was occurred again.
/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it.
class ParanoidMetricDecrementor
{
public:
explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {}
void alarm() { is_alarmed = true; }
void decrement()
{
if (is_alarmed.exchange(false))
{
CurrentMetrics::values[metric]--;
}
}
~ParanoidMetricDecrementor() { decrement(); }
private:
CurrentMetrics::Metric metric;
std::atomic_bool is_alarmed = false;
};
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
try
{
auto & pool_config = pools_configs[job_and_pool.pool_type];
const auto max_pool_size = pool_config.get_max_pool_size();
auto metric_decrementor = std::make_shared<ParanoidMetricDecrementor>(pool_config.tasks_metric);
/// If corresponding pool is not full increment metric and assign new job
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
{
metric_decrementor->alarm();
try /// this try required because we have to manually decrement metric
{
/// Synchronize pool size, because config could be reloaded
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] ()
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, metric_decrementor, job{std::move(job_and_pool.job)}] ()
{
try /// We don't want exceptions in background pool
{
bool job_success = job();
/// Job done, decrement metric and reset no_work counter
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
if (job_success)
{
@ -121,7 +148,7 @@ try
}
catch (...)
{
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true);
}
@ -133,7 +160,7 @@ try
catch (...)
{
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true);
}