From 62988800e61042c131bec717b36a369f8a419ca0 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Sat, 24 Aug 2019 21:40:24 -0400 Subject: [PATCH] Rewriting implementation of LIVE VIEW no users thread. --- .../LiveView/LiveViewBlockInputStream.h | 21 +++--- .../LiveView/LiveViewEventsBlockInputStream.h | 20 +++--- .../src/Storages/LiveView/StorageLiveView.cpp | 66 +++++++++---------- dbms/src/Storages/LiveView/StorageLiveView.h | 16 ++--- 4 files changed, 63 insertions(+), 60 deletions(-) diff --git a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h index 345fceaf095..60839f3e66f 100644 --- a/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewBlockInputStream.h @@ -18,10 +18,13 @@ using NonBlockingResult = std::pair; public: ~LiveViewBlockInputStream() override { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); + /// Wake up no users thread + { + std::cerr << "DEBUG: live view block input stream ... send wake up thread\n"; + std::lock_guard lock(storage->no_users_thread_mutex); + storage->no_users_thread_wakeup = true; + storage->no_users_thread_condition.notify_one(); + } } LiveViewBlockInputStream(std::shared_ptr storage_, @@ -29,9 +32,12 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) + : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), + active_ptr(std::move(active_ptr_)), + has_limit(has_limit_), limit(limit_), + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -200,7 +206,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h index 120d0098536..e0e6ff78d21 100644 --- a/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h +++ b/dbms/src/Storages/LiveView/LiveViewEventsBlockInputStream.h @@ -37,10 +37,12 @@ using NonBlockingResult = std::pair; public: ~LiveViewEventsBlockInputStream() override { - /// Start storage no users thread - /// if we are the last active user - if (!storage->is_dropped && blocks_ptr.use_count() < 3) - storage->startNoUsersThread(temporary_live_view_timeout_sec); + /// Wake up no users thread + { + std::lock_guard lock(storage->no_users_thread_mutex); + storage->no_users_thread_wakeup = true; + storage->no_users_thread_condition.notify_one(); + } } /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update /// and LIMIT 0 just returns data without waiting for any updates @@ -49,9 +51,12 @@ public: std::shared_ptr blocks_metadata_ptr_, std::shared_ptr active_ptr_, const bool has_limit_, const UInt64 limit_, - const UInt64 heartbeat_interval_sec_, - const UInt64 temporary_live_view_timeout_sec_) - : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_) + const UInt64 heartbeat_interval_sec_) + : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), + blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), + active_ptr(std::move(active_ptr_)), has_limit(has_limit_), + limit(limit_), + heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000) { /// grab active pointer active = active_ptr.lock(); @@ -236,7 +241,6 @@ private: Int64 num_updates = -1; bool end_of_blocks = false; UInt64 heartbeat_interval_usec; - UInt64 temporary_live_view_timeout_sec; UInt64 last_event_timestamp_usec = 0; Poco::Timestamp timestamp; }; diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 3c0d205fa3f..d5de3b4a914 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -363,45 +363,49 @@ void StorageLiveView::checkTableCanBeDropped() const } } -void StorageLiveView::noUsersThread(const UInt64 & timeout) +void StorageLiveView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) { - if (shutdown_called) - return; - bool drop_table = false; + if (storage->shutdown_called || storage->is_dropped) + return; + { while (1) { - std::unique_lock lock(no_users_thread_mutex); - if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; })) + std::unique_lock lock(storage->no_users_thread_mutex); + if(!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) { - no_users_thread_wakeup = false; - if (shutdown_called) + storage->no_users_thread_wakeup = false; + if (storage->shutdown_called || storage->is_dropped) return; - if (hasUsers()) - return; - if (!global_context.getDependencies(database_name, table_name).empty()) + if (storage->hasUsers()) + continue; + if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) continue; drop_table = true; } + else { + storage->no_users_thread_wakeup = false; + continue; + } break; } } if (drop_table) { - if (global_context.tryGetTable(database_name, table_name)) + if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) { try { /// We create and execute `drop` query for this table auto drop_query = std::make_shared(); - drop_query->database = database_name; - drop_query->table = table_name; + drop_query->database = storage->database_name; + drop_query->table = storage->table_name; drop_query->kind = ASTDropQuery::Kind::Drop; ASTPtr ast_drop_query = drop_query; - InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); + InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); drop_interpreter.execute(); } catch (...) @@ -413,13 +417,6 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout) void StorageLiveView::startNoUsersThread(const UInt64 & timeout) { - bool expected = false; - if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) - return; - - if (is_dropped) - return; - if (is_temporary) { if (no_users_thread.joinable()) @@ -435,10 +432,9 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout) std::lock_guard lock(no_users_thread_mutex); no_users_thread_wakeup = false; } - if (!is_dropped) - no_users_thread = std::thread(&StorageLiveView::noUsersThread, this, timeout); + no_users_thread = std::thread(&StorageLiveView::noUsersThread, + std::static_pointer_cast(shared_from_this()), timeout); } - start_no_users_thread_called = false; } void StorageLiveView::startup() @@ -454,19 +450,19 @@ void StorageLiveView::shutdown() if (no_users_thread.joinable()) { - std::lock_guard lock(no_users_thread_mutex); - no_users_thread_wakeup = true; - no_users_thread_condition.notify_one(); - /// Must detach the no users thread - /// as we can't join it as it will result - /// in a deadlock - no_users_thread.detach(); /// TODO Not viable at all. + { + std::lock_guard lock(no_users_thread_mutex); + no_users_thread_wakeup = true; + no_users_thread_condition.notify_one(); + } } } StorageLiveView::~StorageLiveView() { shutdown(); + if (no_users_thread.joinable()) + no_users_thread.detach(); } void StorageLiveView::drop() @@ -532,8 +528,7 @@ BlockInputStreams StorageLiveView::watch( if (query.is_watch_events) { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); + auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); if (no_users_thread.joinable()) { @@ -557,8 +552,7 @@ BlockInputStreams StorageLiveView::watch( } else { - auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), - context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); + auto reader = std::make_shared(std::static_pointer_cast(shared_from_this()), blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, context.getSettingsRef().live_view_heartbeat_interval.totalSeconds()); if (no_users_thread.joinable()) { diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index 9930d8d6154..a9a8985b4f8 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -71,11 +71,9 @@ public: { return active_ptr.use_count() > 1; } - /// Background thread for temporary tables - /// which drops this table if there are no users - void startNoUsersThread(const UInt64 & timeout); + /// No users thread mutex, predicate and wake up condition std::mutex no_users_thread_mutex; - bool no_users_thread_wakeup{false}; + bool no_users_thread_wakeup = false; std::condition_variable no_users_thread_condition; /// Get blocks hash /// must be called with mutex locked @@ -149,7 +147,7 @@ private: String database_name; ASTPtr inner_query; Context & global_context; - bool is_temporary {false}; + bool is_temporary = false; mutable Block sample_block; /// Mutex for the blocks and ready condition @@ -166,10 +164,12 @@ private: std::shared_ptr blocks_metadata_ptr; BlocksPtrs mergeable_blocks; - void noUsersThread(const UInt64 & timeout); + /// Background thread for temporary tables + /// which drops this table if there are no users + void startNoUsersThread(const UInt64 & timeout); + static void noUsersThread(std::shared_ptr storage, const UInt64 & timeout); std::thread no_users_thread; - std::atomic shutdown_called{false}; - std::atomic start_no_users_thread_called{false}; + std::atomic shutdown_called = false; UInt64 temporary_live_view_timeout; StorageLiveView(