Fix hanging in TemporaryLiveViewCleaner

This commit is contained in:
Vitaly Baranov 2021-04-08 16:13:10 +03:00
parent 03cb0299a9
commit 5bcb7a0fed
3 changed files with 28 additions and 49 deletions

View File

@ -152,7 +152,7 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews. /// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition. /// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startupIfNecessary(); TemporaryLiveViewCleaner::instance().startup();
} }
void DatabaseCatalog::shutdownImpl() void DatabaseCatalog::shutdownImpl()

View File

@ -48,38 +48,13 @@ void TemporaryLiveViewCleaner::init(Context & global_context_)
the_instance.reset(new TemporaryLiveViewCleaner(global_context_)); the_instance.reset(new TemporaryLiveViewCleaner(global_context_));
} }
void TemporaryLiveViewCleaner::startupIfNecessary() void TemporaryLiveViewCleaner::startup()
{ {
background_thread_can_start = true;
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
if (background_thread_should_exit)
return;
if (!views.empty()) if (!views.empty())
startupIfNecessaryImpl(lock); startBackgroundThread();
else
can_start_background_thread = true;
}
void TemporaryLiveViewCleaner::startupIfNecessaryImpl(const std::lock_guard<std::mutex> &)
{
/// If views.empty() the background thread isn't running or it's going to stop right now.
/// If can_start_background_thread is false, then the thread has not been started previously.
bool background_thread_is_running;
if (can_start_background_thread)
{
background_thread_is_running = !views.empty();
}
else
{
can_start_background_thread = true;
background_thread_is_running = false;
}
if (!background_thread_is_running)
{
if (background_thread.joinable())
background_thread.join();
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
} }
void TemporaryLiveViewCleaner::shutdown() void TemporaryLiveViewCleaner::shutdown()
@ -87,13 +62,11 @@ void TemporaryLiveViewCleaner::shutdown()
the_instance.reset(); the_instance.reset();
} }
TemporaryLiveViewCleaner::TemporaryLiveViewCleaner(Context & global_context_) TemporaryLiveViewCleaner::TemporaryLiveViewCleaner(Context & global_context_)
: global_context(global_context_) : global_context(global_context_)
{ {
} }
TemporaryLiveViewCleaner::~TemporaryLiveViewCleaner() TemporaryLiveViewCleaner::~TemporaryLiveViewCleaner()
{ {
stopBackgroundThread(); stopBackgroundThread();
@ -108,27 +81,29 @@ void TemporaryLiveViewCleaner::addView(const std::shared_ptr<StorageLiveView> &
auto current_time = std::chrono::system_clock::now(); auto current_time = std::chrono::system_clock::now();
auto time_of_next_check = current_time + view->getTimeout(); auto time_of_next_check = current_time + view->getTimeout();
std::lock_guard lock{mutex};
if (background_thread_should_exit)
return;
if (can_start_background_thread)
startupIfNecessaryImpl(lock);
/// Keep the vector `views` sorted by time of next check. /// Keep the vector `views` sorted by time of next check.
StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check}; StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check};
std::lock_guard lock{mutex};
views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check); views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check);
if (background_thread_can_start)
{
startBackgroundThread();
background_thread_wake_up.notify_one(); background_thread_wake_up.notify_one();
}
} }
void TemporaryLiveViewCleaner::backgroundThreadFunc() void TemporaryLiveViewCleaner::backgroundThreadFunc()
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
while (!background_thread_should_exit && !views.empty()) while (!background_thread_should_exit)
{ {
if (views.empty())
background_thread_wake_up.wait(lock);
else
background_thread_wake_up.wait_until(lock, views.front().time_of_check); background_thread_wake_up.wait_until(lock, views.front().time_of_check);
if (background_thread_should_exit) if (background_thread_should_exit)
break; break;
@ -173,14 +148,18 @@ void TemporaryLiveViewCleaner::backgroundThreadFunc()
} }
void TemporaryLiveViewCleaner::startBackgroundThread()
{
if (!background_thread.joinable() && background_thread_can_start && !background_thread_should_exit)
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
void TemporaryLiveViewCleaner::stopBackgroundThread() void TemporaryLiveViewCleaner::stopBackgroundThread()
{ {
if (background_thread.joinable())
{
background_thread_should_exit = true; background_thread_should_exit = true;
background_thread_wake_up.notify_one(); background_thread_wake_up.notify_one();
if (background_thread.joinable())
background_thread.join(); background_thread.join();
}
} }
} }

View File

@ -23,8 +23,7 @@ public:
static void init(Context & global_context_); static void init(Context & global_context_);
static void shutdown(); static void shutdown();
void startupIfNecessary(); void startup();
void startupIfNecessaryImpl(const std::lock_guard<std::mutex> &);
private: private:
friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type; friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type;
@ -33,6 +32,7 @@ private:
~TemporaryLiveViewCleaner(); ~TemporaryLiveViewCleaner();
void backgroundThreadFunc(); void backgroundThreadFunc();
void startBackgroundThread();
void stopBackgroundThread(); void stopBackgroundThread();
struct StorageAndTimeOfCheck struct StorageAndTimeOfCheck
@ -47,7 +47,7 @@ private:
std::mutex mutex; std::mutex mutex;
std::vector<StorageAndTimeOfCheck> views; std::vector<StorageAndTimeOfCheck> views;
ThreadFromGlobalPool background_thread; ThreadFromGlobalPool background_thread;
bool can_start_background_thread = false; std::atomic<bool> background_thread_can_start = false;
std::atomic<bool> background_thread_should_exit = false; std::atomic<bool> background_thread_should_exit = false;
std::condition_variable background_thread_wake_up; std::condition_variable background_thread_wake_up;
}; };