This commit is contained in:
Nikita Mikhaylov 2021-09-03 00:28:29 +00:00
parent 7baad6adde
commit 9257188512
2 changed files with 26 additions and 7 deletions

View File

@ -13,7 +13,8 @@ namespace DB
*/
template <class T>
class RingBuffer {
class RingBuffer
{
public:
explicit RingBuffer(size_t capacity_) : capacity(capacity_)
{
@ -51,6 +52,7 @@ public:
return true;
}
/// In case of T = std::shared_ptr<Something> it won't cause any allocations
template <typename Predicate>
void eraseAll(Predicate && predicate)
{

View File

@ -77,7 +77,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
try
{
/// This is needed to increase / decrease the number of threads at runtime
if (update_timer.compareAndRestartDeferred(1.))
if (update_timer.compareAndRestartDeferred(10.))
updateConfiguration();
}
catch (...)
@ -90,11 +90,16 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
setThreadName(name.c_str());
auto check_if_deleting = [&] () -> bool
/// Storage may want to destroy and it needs to finish all task related to it.
/// But checking causes some interaction with storage methods, for example it calls getStorageID.
/// So, we must execute this checking once, signal another thread that we are finished and be destroyed.
/// Not doing any complex stuff, especially interaction with Storage...
/// Calling this check twice may cause segfault.
auto check_if_currently_deleting = [&] () -> bool
{
active.eraseAll([&] (auto x) { return x == item; });
for (auto & id : currently_deleting)
for (const auto & id : currently_deleting)
{
if (item->task->getStorageID() == id)
{
@ -106,9 +111,14 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
return false;
};
bool checked{false};
SCOPE_EXIT({
if (checked)
return;
checked = true;
std::lock_guard guard(mutex);
check_if_deleting();
check_if_currently_deleting();
});
try
@ -117,7 +127,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
std::lock_guard guard(mutex);
if (check_if_deleting())
if (check_if_currently_deleting())
return;
pending.tryPush(item);
@ -125,6 +135,9 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
return;
}
/// In a situation of a lack of memory this method can throw an exception,
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
std::lock_guard guard(mutex);
@ -132,9 +145,13 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
}
catch(...)
{
item->task->onCompleted();
std::lock_guard guard(mutex);
has_tasks.notify_one();
try
{
item->task->onCompleted();
}
catch (...) {}
tryLogCurrentException(__PRETTY_FUNCTION__);
}