after merge with master

This commit is contained in:
Nikita Mikhaylov 2021-08-31 12:09:35 +00:00
parent f8d4f04294
commit 3027efe860
2 changed files with 31 additions and 10 deletions

View File

@ -6,6 +6,32 @@
namespace DB namespace DB
{ {
/// This is a RAII class which only decrements metric.
/// It is added because after all other fixes a bug non-executing merges was occurred again.
/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it.
class ParanoidMetricDecrementor
{
public:
explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {}
void alarm() { is_alarmed = true; }
void decrement()
{
if (is_alarmed.exchange(false))
{
CurrentMetrics::values[metric]--;
}
}
~ParanoidMetricDecrementor() { decrement(); }
private:
CurrentMetrics::Metric metric;
std::atomic_bool is_alarmed = false;
};
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id) void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
{ {
std::lock_guard remove_lock(remove_mutex); std::lock_guard remove_lock(remove_mutex);
@ -66,6 +92,9 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
bool res = pool.trySchedule([this, task = current, promise = current_promise] () mutable bool res = pool.trySchedule([this, task = current, promise = current_promise] () mutable
{ {
auto metric_decrementor = std::make_shared<ParanoidMetricDecrementor>(metric);
metric_decrementor->alarm();
auto on_exit = [&] () auto on_exit = [&] ()
{ {
promise->set_value(); promise->set_value();
@ -89,7 +118,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
return; return;
} }
decrementTasksCount(); metric_decrementor->decrement();
task->onCompleted(); task->onCompleted();
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
@ -97,7 +126,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
} }
catch(...) catch(...)
{ {
decrementTasksCount(); metric_decrementor->decrement();
task->onCompleted(); task->onCompleted();
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
has_tasks.notify_one(); has_tasks.notify_one();

View File

@ -92,7 +92,6 @@ public:
CurrentMetrics::add(metric); CurrentMetrics::add(metric);
tasks.emplace_back(task); tasks.emplace_back(task);
++scheduled_tasks_count;
has_tasks.notify_one(); has_tasks.notify_one();
return true; return true;
} }
@ -123,12 +122,6 @@ private:
pool.setQueueSize(max_threads); pool.setQueueSize(max_threads);
} }
void decrementTasksCount()
{
--scheduled_tasks_count;
CurrentMetrics::sub(metric);
}
void schedulerThreadFunction(); void schedulerThreadFunction();
@ -146,7 +139,6 @@ private:
std::mutex mutex; std::mutex mutex;
std::condition_variable has_tasks; std::condition_variable has_tasks;
std::atomic_size_t scheduled_tasks_count{0};
std::atomic_bool shutdown_suspend{false}; std::atomic_bool shutdown_suspend{false};
ThreadPool pool; ThreadPool pool;