From 9257188512ad886f91c32de98e005cd9fb15fee5 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 3 Sep 2021 00:28:29 +0000 Subject: [PATCH] Comments --- src/Common/RingBuffer.h | 4 ++- .../MergeTree/MergeMutateExecutor.cpp | 29 +++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/Common/RingBuffer.h b/src/Common/RingBuffer.h index fb208307c3b..d8e0ae74156 100644 --- a/src/Common/RingBuffer.h +++ b/src/Common/RingBuffer.h @@ -13,7 +13,8 @@ namespace DB */ template -class RingBuffer { +class RingBuffer +{ public: explicit RingBuffer(size_t capacity_) : capacity(capacity_) { @@ -51,6 +52,7 @@ public: return true; } + /// In case of T = std::shared_ptr it won't cause any allocations template void eraseAll(Predicate && predicate) { diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index 9178ba85452..e2c65a0916b 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -77,7 +77,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() try { /// This is needed to increase / decrease the number of threads at runtime - if (update_timer.compareAndRestartDeferred(1.)) + if (update_timer.compareAndRestartDeferred(10.)) updateConfiguration(); } catch (...) @@ -90,11 +90,16 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() { setThreadName(name.c_str()); - auto check_if_deleting = [&] () -> bool + /// Storage may want to destroy and it needs to finish all task related to it. + /// But checking causes some interaction with storage methods, for example it calls getStorageID. + /// So, we must execute this checking once, signal another thread that we are finished and be destroyed. + /// Not doing any complex stuff, especially interaction with Storage... + /// Calling this check twice may cause segfault. + auto check_if_currently_deleting = [&] () -> bool { active.eraseAll([&] (auto x) { return x == item; }); - for (auto & id : currently_deleting) + for (const auto & id : currently_deleting) { if (item->task->getStorageID() == id) { @@ -106,9 +111,14 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() return false; }; + bool checked{false}; + SCOPE_EXIT({ + if (checked) + return; + checked = true; std::lock_guard guard(mutex); - check_if_deleting(); + check_if_currently_deleting(); }); try @@ -117,7 +127,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() { std::lock_guard guard(mutex); - if (check_if_deleting()) + if (check_if_currently_deleting()) return; pending.tryPush(item); @@ -125,6 +135,9 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() return; } + /// In a situation of a lack of memory this method can throw an exception, + /// because it may interact somehow with BackgroundSchedulePool, which may allocate memory + /// But it is rather safe, because we have try...catch block here, and another one in ThreadPool. item->task->onCompleted(); std::lock_guard guard(mutex); @@ -132,9 +145,13 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() } catch(...) { - item->task->onCompleted(); std::lock_guard guard(mutex); has_tasks.notify_one(); + try + { + item->task->onCompleted(); + } + catch (...) {} tryLogCurrentException(__PRETTY_FUNCTION__); }