From 2a9ab482792cdadf0d4e2365c3d11494a3e38230 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 8 Sep 2020 02:08:17 +0300 Subject: [PATCH] Use join() instead of detach() for the no_users_thread in StorageLiveView. --- src/Interpreters/Context.cpp | 8 + src/Interpreters/Context.h | 8 +- src/Interpreters/InterpreterDropQuery.h | 1 + .../LiveView/LiveViewBlockInputStream.h | 15 +- .../LiveView/LiveViewEventsBlockInputStream.h | 14 +- src/Storages/LiveView/StorageLiveView.cpp | 144 +---------------- src/Storages/LiveView/StorageLiveView.h | 23 ++- .../LiveView/TemporaryLiveViewCleaner.cpp | 148 ++++++++++++++++++ .../LiveView/TemporaryLiveViewCleaner.h | 51 ++++++ src/Storages/ya.make | 1 + 10 files changed, 233 insertions(+), 180 deletions(-) create mode 100644 src/Storages/LiveView/TemporaryLiveViewCleaner.cpp create mode 100644 src/Storages/LiveView/TemporaryLiveViewCleaner.h diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 70cf41a679c..3c4c095cc26 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -423,6 +424,7 @@ struct ContextShared if (system_logs) system_logs->shutdown(); + TemporaryLiveViewCleaner::shutdown(); DatabaseCatalog::shutdown(); /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference). @@ -479,6 +481,12 @@ Context Context::createGlobal(ContextShared * shared) return res; } +void Context::initGlobal() +{ + DatabaseCatalog::init(this); + TemporaryLiveViewCleaner::init(*this); +} + SharedContextHolder Context::createShared() { return SharedContextHolder(std::make_unique()); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index c8d13baa9ae..743c92d56b5 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -445,11 +445,7 @@ public: void makeQueryContext() { query_context = this; } void makeSessionContext() { session_context = this; } - void makeGlobalContext() - { - global_context = this; - DatabaseCatalog::init(this); - } + void makeGlobalContext() { initGlobal(); global_context = this; } const Settings & getSettingsRef() const { return settings; } @@ -622,6 +618,8 @@ public: private: std::unique_lock getLock() const; + void initGlobal(); + /// Compute and set actual user settings, client_info.current_user should be set void calculateAccessRights(); diff --git a/src/Interpreters/InterpreterDropQuery.h b/src/Interpreters/InterpreterDropQuery.h index 80bd6c6531a..b54736b5c21 100644 --- a/src/Interpreters/InterpreterDropQuery.h +++ b/src/Interpreters/InterpreterDropQuery.h @@ -10,6 +10,7 @@ namespace DB { class Context; using DatabaseAndTable = std::pair; +class AccessRightsElements; /** Allow to either drop table with all its data (DROP), * or remove information about table (just forget) from server (DETACH), diff --git a/src/Storages/LiveView/LiveViewBlockInputStream.h b/src/Storages/LiveView/LiveViewBlockInputStream.h index 7cab2cb41ed..737e76754c5 100644 --- a/src/Storages/LiveView/LiveViewBlockInputStream.h +++ b/src/Storages/LiveView/LiveViewBlockInputStream.h @@ -16,27 +16,17 @@ class LiveViewBlockInputStream : public IBlockInputStream using NonBlockingResult = std::pair; public: - ~LiveViewBlockInputStream() override - { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); - } - LiveViewBlockInputStream(std::shared_ptr storage_, std::shared_ptr blocks_ptr_, std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), - heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), - temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -205,7 +195,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; }; diff --git a/src/Storages/LiveView/LiveViewEventsBlockInputStream.h b/src/Storages/LiveView/LiveViewEventsBlockInputStream.h index ac5e7e3d6fd..4060b17c1ed 100644 --- a/src/Storages/LiveView/LiveViewEventsBlockInputStream.h +++ b/src/Storages/LiveView/LiveViewEventsBlockInputStream.h @@ -34,13 +34,6 @@ class LiveViewEventsBlockInputStream : public IBlockInputStream using NonBlockingResult = std::pair; public: - ~LiveViewEventsBlockInputStream() override - { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); - } /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update /// and LIMIT 0 just returns data without waiting for any updates LiveViewEventsBlockInputStream(std::shared_ptr storage_, @@ -48,14 +41,12 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), - heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), - temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -236,7 +227,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 54ac5bcc791..b16c02eec6b 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -12,10 +12,8 @@ limitations under the License. */ #include #include #include -#include #include #include -#include #include #include #include @@ -31,6 +29,7 @@ limitations under the License. */ #include #include #include +#include #include #include @@ -276,7 +275,7 @@ StorageLiveView::StorageLiveView( if (query.live_view_timeout) { is_temporary = true; - temporary_live_view_timeout = *query.live_view_timeout; + temporary_live_view_timeout = std::chrono::seconds{*query.live_view_timeout}; } blocks_ptr = std::make_shared(); @@ -384,128 +383,21 @@ void StorageLiveView::checkTableCanBeDropped() const } } -void StorageLiveView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) -{ - bool drop_table = false; - - if (storage->shutdown_called) - return; - - auto table_id = storage->getStorageID(); - { - while (true) - { - std::unique_lock lock(storage->no_users_thread_wakeup_mutex); - if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) - { - storage->no_users_thread_wakeup = false; - if (storage->shutdown_called) - return; - if (storage->hasUsers()) - return; - if (!DatabaseCatalog::instance().getDependencies(table_id).empty()) - continue; - drop_table = true; - } - break; - } - } - - if (drop_table) - { - if (DatabaseCatalog::instance().tryGetTable(table_id, storage->global_context)) - { - try - { - /// We create and execute `drop` query for this table - auto drop_query = std::make_shared(); - drop_query->database = table_id.database_name; - drop_query->table = table_id.table_name; - drop_query->kind = ASTDropQuery::Kind::Drop; - ASTPtr ast_drop_query = drop_query; - InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); - drop_interpreter.execute(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } -} - -void StorageLiveView::startNoUsersThread(const UInt64 & timeout) -{ - bool expected = false; - if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) - return; - - if (is_temporary) - { - std::lock_guard no_users_thread_lock(no_users_thread_mutex); - - if (shutdown_called) - return; - - if (no_users_thread.joinable()) - { - { - std::lock_guard lock(no_users_thread_wakeup_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - } - no_users_thread.join(); - } - { - std::lock_guard lock(no_users_thread_wakeup_mutex); - no_users_thread_wakeup = false; - } - if (!is_dropped) - no_users_thread = std::thread(&StorageLiveView::noUsersThread, - std::static_pointer_cast(shared_from_this()), timeout); - } - - start_no_users_thread_called = false; -} - void StorageLiveView::startup() { - startNoUsersThread(temporary_live_view_timeout); + if (is_temporary) + TemporaryLiveViewCleaner::instance().addView(std::static_pointer_cast(shared_from_this())); } void StorageLiveView::shutdown() { + shutdown_called = true; DatabaseCatalog::instance().removeDependency(select_table_id, getStorageID()); - bool expected = false; - if (!shutdown_called.compare_exchange_strong(expected, true)) - return; - - /// WATCH queries should be stopped after setting shutdown_called to true. - /// Otherwise livelock is possible for LiveView table in Atomic database: - /// WATCH query will wait for table to be dropped and DatabaseCatalog will wait for queries to finish - - { - std::lock_guard no_users_thread_lock(no_users_thread_mutex); - if (no_users_thread.joinable()) - { - { - std::lock_guard lock(no_users_thread_wakeup_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - } - } - } } StorageLiveView::~StorageLiveView() { shutdown(); - - { - std::lock_guard lock(no_users_thread_mutex); - if (no_users_thread.joinable()) - no_users_thread.detach(); - } } void StorageLiveView::drop() @@ -572,18 +464,7 @@ BlockInputStreams StorageLiveView::watch( auto reader = std::make_shared( std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, - context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - temporary_live_view_timeout); - - { - std::lock_guard no_users_thread_lock(no_users_thread_mutex); - if (no_users_thread.joinable()) - { - std::lock_guard lock(no_users_thread_wakeup_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - } - } + context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); { std::lock_guard lock(mutex); @@ -603,18 +484,7 @@ BlockInputStreams StorageLiveView::watch( auto reader = std::make_shared( std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, - context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - temporary_live_view_timeout); - - { - std::lock_guard no_users_thread_lock(no_users_thread_mutex); - if (no_users_thread.joinable()) - { - std::lock_guard lock(no_users_thread_wakeup_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - } - } + context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); { std::lock_guard lock(mutex); diff --git a/src/Storages/LiveView/StorageLiveView.h b/src/Storages/LiveView/StorageLiveView.h index 43afd169a92..32e18ef6092 100644 --- a/src/Storages/LiveView/StorageLiveView.h +++ b/src/Storages/LiveView/StorageLiveView.h @@ -38,6 +38,10 @@ using ASTPtr = std::shared_ptr; using BlocksMetadataPtr = std::shared_ptr; using MergeableBlocksPtr = std::shared_ptr; +class Pipe; +using Pipes = std::vector; + + class StorageLiveView final : public ext::shared_ptr_helper, public IStorage { friend struct ext::shared_ptr_helper; @@ -70,7 +74,9 @@ public: NamesAndTypesList getVirtuals() const override; - bool isTemporary() { return is_temporary; } + bool isTemporary() const { return is_temporary; } + std::chrono::seconds getTimeout() const { return temporary_live_view_timeout; } + /// Check if we have any readers /// must be called with mutex locked @@ -85,11 +91,7 @@ public: { return active_ptr.use_count() > 1; } - /// No users thread mutex, predicate and wake up condition - void startNoUsersThread(const UInt64 & timeout); - std::mutex no_users_thread_wakeup_mutex; - bool no_users_thread_wakeup = false; - std::condition_variable no_users_thread_condition; + /// Get blocks hash /// must be called with mutex locked String getBlocksHashKey() @@ -175,6 +177,8 @@ private: std::unique_ptr live_view_context; bool is_temporary = false; + std::chrono::seconds temporary_live_view_timeout; + /// Mutex to protect access to sample block and inner_blocks_query mutable std::mutex sample_block_lock; mutable Block sample_block; @@ -193,14 +197,7 @@ private: std::shared_ptr blocks_metadata_ptr; MergeableBlocksPtr mergeable_blocks; - /// Background thread for temporary tables - /// which drops this table if there are no users - static void noUsersThread(std::shared_ptr storage, const UInt64 & timeout); - std::mutex no_users_thread_mutex; - std::thread no_users_thread; std::atomic shutdown_called = false; - std::atomic start_no_users_thread_called = false; - UInt64 temporary_live_view_timeout; StorageLiveView( const StorageID & table_id_, diff --git a/src/Storages/LiveView/TemporaryLiveViewCleaner.cpp b/src/Storages/LiveView/TemporaryLiveViewCleaner.cpp new file mode 100644 index 00000000000..0f7c1039d72 --- /dev/null +++ b/src/Storages/LiveView/TemporaryLiveViewCleaner.cpp @@ -0,0 +1,148 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + +namespace +{ + void executeDropQuery(const StorageID & storage_id, Context & context) + { + if (!DatabaseCatalog::instance().isTableExist(storage_id, context)) + return; + try + { + /// We create and execute `drop` query for this table + auto drop_query = std::make_shared(); + drop_query->database = storage_id.database_name; + drop_query->table = storage_id.table_name; + drop_query->kind = ASTDropQuery::Kind::Drop; + ASTPtr ast_drop_query = drop_query; + InterpreterDropQuery drop_interpreter(ast_drop_query, context); + drop_interpreter.execute(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + + +std::unique_ptr TemporaryLiveViewCleaner::the_instance; + + +void TemporaryLiveViewCleaner::init(Context & global_context_) +{ + if (the_instance) + throw Exception("TemporaryLiveViewCleaner already initialized", ErrorCodes::LOGICAL_ERROR); + the_instance.reset(new TemporaryLiveViewCleaner(global_context_)); +} + + +void TemporaryLiveViewCleaner::shutdown() +{ + the_instance.reset(); +} + + +TemporaryLiveViewCleaner::TemporaryLiveViewCleaner(Context & global_context_) + : global_context(global_context_) +{ +} + + +TemporaryLiveViewCleaner::~TemporaryLiveViewCleaner() +{ + stopBackgroundThread(); +} + + +void TemporaryLiveViewCleaner::addView(const std::shared_ptr & view) +{ + if (!view->isTemporary()) + return; + + auto current_time = std::chrono::system_clock::now(); + auto time_of_next_check = current_time + view->getTimeout(); + + std::lock_guard lock{mutex}; + + /// Keep the vector `views` sorted by time of next check. + StorageAndTimeOfCheck storage_and_time_of_check{view, time_of_next_check}; + views.insert(std::upper_bound(views.begin(), views.end(), storage_and_time_of_check), storage_and_time_of_check); + + if (!background_thread.joinable()) + background_thread = ThreadFromGlobalPool{&TemporaryLiveViewCleaner::backgroundThreadFunc, this}; + + background_thread_wake_up.notify_one(); +} + + +void TemporaryLiveViewCleaner::backgroundThreadFunc() +{ + std::unique_lock lock{mutex}; + while (!background_thread_should_exit && !views.empty()) + { + background_thread_wake_up.wait_until(lock, views.front().time_of_check); + if (background_thread_should_exit) + return; + + auto current_time = std::chrono::system_clock::now(); + std::vector storages_to_drop; + + auto it = views.begin(); + while (it != views.end()) + { + std::shared_ptr storage = it->storage.lock(); + auto & time_of_check = it->time_of_check; + if (!storage) + { + /// Storage has been already removed. + it = views.erase(it); + continue; + } + + ++it; + + if (current_time < time_of_check) + break; /// It's not the time to check it yet. + + time_of_check = current_time + storage->getTimeout(); + + auto storage_id = storage->getStorageID(); + if (storage->hasUsers() || !DatabaseCatalog::instance().getDependencies(storage_id).empty()) + continue; + + storages_to_drop.emplace_back(storage_id); + } + + lock.unlock(); + for (const auto & storage_id : storages_to_drop) + executeDropQuery(storage_id, global_context); + lock.lock(); + } +} + + +void TemporaryLiveViewCleaner::stopBackgroundThread() +{ + std::lock_guard lock{mutex}; + if (background_thread.joinable()) + { + background_thread_should_exit = true; + background_thread_wake_up.notify_one(); + background_thread.join(); + } +} + +} diff --git a/src/Storages/LiveView/TemporaryLiveViewCleaner.h b/src/Storages/LiveView/TemporaryLiveViewCleaner.h new file mode 100644 index 00000000000..57c12bd1c07 --- /dev/null +++ b/src/Storages/LiveView/TemporaryLiveViewCleaner.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include + + +namespace DB +{ +class StorageLiveView; +struct StorageID; + +/// This class removes temporary live views in the background thread when it's possible. +/// There should only a single instance of this class. +class TemporaryLiveViewCleaner +{ +public: + static TemporaryLiveViewCleaner & instance() { return *the_instance; } + + /// Drops a specified live view after a while if it's temporary. + void addView(const std::shared_ptr & view); + + /// Should be called once. + static void init(Context & global_context_); + static void shutdown(); + +private: + friend std::unique_ptr::deleter_type; + + TemporaryLiveViewCleaner(Context & global_context_); + ~TemporaryLiveViewCleaner(); + + void backgroundThreadFunc(); + void stopBackgroundThread(); + + struct StorageAndTimeOfCheck + { + std::weak_ptr storage; + std::chrono::system_clock::time_point time_of_check; + bool operator <(const StorageAndTimeOfCheck & other) const { return time_of_check < other.time_of_check; } + }; + + static std::unique_ptr the_instance; + Context & global_context; + std::mutex mutex; + std::vector views; + ThreadFromGlobalPool background_thread; + std::atomic background_thread_should_exit = false; + std::condition_variable background_thread_wake_up; +}; + +} diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 1ddb8c77072..fed961ed2bb 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -20,6 +20,7 @@ SRCS( IStorage.cpp KeyDescription.cpp LiveView/StorageLiveView.cpp + LiveView/TemporaryLiveViewCleaner.cpp MergeTree/ActiveDataPartSet.cpp MergeTree/AllMergeSelector.cpp MergeTree/BackgroundProcessingPool.cpp