fix duplicate UUIDs of LiveView on server startup

This commit is contained in:
Alexander Tokmakov 2021-01-13 20:59:20 +03:00
parent c97469773d
commit 5f0f8ae3cd
4 changed files with 45 additions and 12 deletions

View File

@ -26,7 +26,6 @@
#include <Storages/MergeTree/MergeTreeSettings.h> #include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h> #include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h> #include <Storages/StorageS3Settings.h>
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
#include <Disks/DiskLocal.h> #include <Disks/DiskLocal.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h> #include <Interpreters/ActionLocksManager.h>
@ -429,7 +428,6 @@ struct ContextShared
if (system_logs) if (system_logs)
system_logs->shutdown(); system_logs->shutdown();
TemporaryLiveViewCleaner::shutdown();
DatabaseCatalog::shutdown(); DatabaseCatalog::shutdown();
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference). /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
@ -493,7 +491,6 @@ Context Context::createGlobal(ContextShared * shared)
void Context::initGlobal() void Context::initGlobal()
{ {
DatabaseCatalog::init(*this); DatabaseCatalog::init(*this);
TemporaryLiveViewCleaner::init(*this);
} }
SharedContextHolder Context::createShared() SharedContextHolder Context::createShared()

View File

@ -8,6 +8,7 @@
#include <Poco/File.h> #include <Poco/File.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Storages/StorageMemory.h> #include <Storages/StorageMemory.h>
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -148,10 +149,16 @@ void DatabaseCatalog::loadDatabases()
std::lock_guard lock{tables_marked_dropped_mutex}; std::lock_guard lock{tables_marked_dropped_mutex};
if (!tables_marked_dropped.empty()) if (!tables_marked_dropped.empty())
(*drop_task)->schedule(); (*drop_task)->schedule();
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startupIfNecessary();
} }
void DatabaseCatalog::shutdownImpl() void DatabaseCatalog::shutdownImpl()
{ {
TemporaryLiveViewCleaner::shutdown();
if (drop_task) if (drop_task)
(*drop_task)->deactivate(); (*drop_task)->deactivate();
@ -524,6 +531,7 @@ std::unique_ptr<DatabaseCatalog> DatabaseCatalog::database_catalog;
DatabaseCatalog::DatabaseCatalog(Context & global_context_) DatabaseCatalog::DatabaseCatalog(Context & global_context_)
: global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog")) : global_context(global_context_), log(&Poco::Logger::get("DatabaseCatalog"))
{ {
TemporaryLiveViewCleaner::init(global_context);
} }
DatabaseCatalog & DatabaseCatalog::init(Context & global_context_) DatabaseCatalog & DatabaseCatalog::init(Context & global_context_)

View File

@ -48,6 +48,37 @@ void TemporaryLiveViewCleaner::init(Context & global_context_)
the_instance.reset(new TemporaryLiveViewCleaner(global_context_)); the_instance.reset(new TemporaryLiveViewCleaner(global_context_));
} }
void TemporaryLiveViewCleaner::startupIfNecessary()
{
std::lock_guard lock{mutex};
if (background_thread_should_exit)
return;
if (!views.empty())
startupIfNecessaryImpl(lock);
}
void TemporaryLiveViewCleaner::startupIfNecessaryImpl(const std::lock_guard<std::mutex> &)
{
/// If views.empty() the background thread isn't running or it's going to stop right now.
/// If can_start_background_thread is false, then the thread has not been started previously.
bool background_thread_is_running;
if (can_start_background_thread)
{
background_thread_is_running = !views.empty();
}
else
{
can_start_background_thread = true;
background_thread_is_running = false;
}
if (!background_thread_is_running)
{
if (background_thread.joinable())
background_thread.join();
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
}
void TemporaryLiveViewCleaner::shutdown() void TemporaryLiveViewCleaner::shutdown()
{ {
@ -79,20 +110,13 @@ void TemporaryLiveViewCleaner::addView(const std::shared_ptr<StorageLiveView> &
if (background_thread_should_exit) if (background_thread_should_exit)
return; return;
/// If views.empty() the background thread isn't running or it's going to stop right now. if (can_start_background_thread)
bool background_thread_is_running = !views.empty(); startupIfNecessaryImpl(lock);
/// Keep the vector `views` sorted by time of next check. /// Keep the vector `views` sorted by time of next check.
StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check}; StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check};
views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check); views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check);
if (!background_thread_is_running)
{
if (background_thread.joinable())
background_thread.join();
background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this};
}
background_thread_wake_up.notify_one(); background_thread_wake_up.notify_one();
} }

View File

@ -23,6 +23,9 @@ public:
static void init(Context & global_context_); static void init(Context & global_context_);
static void shutdown(); static void shutdown();
void startupIfNecessary();
void startupIfNecessaryImpl(const std::lock_guard<std::mutex> &);
private: private:
friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type; friend std::unique_ptr<TemporaryLiveViewCleaner>::deleter_type;
@ -44,6 +47,7 @@ private:
std::mutex mutex; std::mutex mutex;
std::vector<StorageAndTimeOfCheck> views; std::vector<StorageAndTimeOfCheck> views;
ThreadFromGlobalPool background_thread; ThreadFromGlobalPool background_thread;
bool can_start_background_thread = false;
std::atomic<bool> background_thread_should_exit = false; std::atomic<bool> background_thread_should_exit = false;
std::condition_variable background_thread_wake_up; std::condition_variable background_thread_wake_up;
}; };