From 7362ca686f4be9c4a5e29b82049125d32341b05b Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 4 Jul 2014 17:59:05 +0400 Subject: [PATCH] Merge --- .../DB/Storages/StorageReplicatedMergeTree.h | 2 ++ .../Storages/StorageReplicatedMergeTree.cpp | 26 +++++++++---------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index ecec400d813..e95feb86c72 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -233,8 +233,10 @@ private: /// Нужно ли завершить фоновые потоки (кроме restarting_thread). volatile bool shutdown_called = false; + Poco::Event shutdown_event; /// Нужно ли завершить restarting_thread. volatile bool permanent_shutdown_called = false; + Poco::Event permanent_shutdown_event; StorageReplicatedMergeTree( const String & zookeeper_path_, diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 67621645c0d..2e9a20a00ec 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -10,7 +10,7 @@ namespace DB { -const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5); +const auto QUEUE_UPDATE_SLEEP_MS = 5 * 1000; const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5); const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1); const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0); @@ -38,7 +38,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name), reader(data), writer(data), merger(data), fetcher(data), - log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")) + log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")), + shutdown_event(false), permanent_shutdown_event(false) { if (!zookeeper) { @@ -912,7 +913,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread() tryLogCurrentException(__PRETTY_FUNCTION__); } - std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP); + shutdown_event.tryWait(QUEUE_UPDATE_SLEEP_MS); } } @@ -1105,14 +1106,7 @@ void StorageReplicatedMergeTree::clearOldBlocksThread() tryLogCurrentException(__PRETTY_FUNCTION__); } - /// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду. - /// TODO: Лучше во всех подобных местах использовать condition variable. - for (size_t i = 0; i < 60; ++i) - { - if (shutdown_called || !is_leader_node) - break; - std::this_thread::sleep_for(std::chrono::seconds(1)); - } + shutdown_event.tryWait(60 * 1000); } } @@ -1223,6 +1217,7 @@ void StorageReplicatedMergeTree::shutdown() return; } permanent_shutdown_called = true; + permanent_shutdown_event.set(); restarting_thread.join(); } @@ -1230,6 +1225,7 @@ void StorageReplicatedMergeTree::partialShutdown() { leader_election = nullptr; shutdown_called = true; + shutdown_event.set(); replica_is_active_node = nullptr; merger.cancelAll(); @@ -1258,8 +1254,10 @@ void StorageReplicatedMergeTree::goReadOnly() LOG_INFO(log, "Going to read-only mode"); is_read_only = true; - shutdown_called = true; permanent_shutdown_called = true; + permanent_shutdown_event.set(); + shutdown_called = true; + shutdown_event.set(); leader_election = nullptr; replica_is_active_node = nullptr; @@ -1271,6 +1269,7 @@ void StorageReplicatedMergeTree::goReadOnly() if (is_leader_node) { is_leader_node = false; + merge_selecting_event.set(); if (merge_selecting_thread.joinable()) merge_selecting_thread.join(); if (clear_old_blocks_thread.joinable()) @@ -1286,6 +1285,7 @@ void StorageReplicatedMergeTree::goReadOnly() void StorageReplicatedMergeTree::startup() { shutdown_called = false; + shutdown_event.reset(); merger.uncancelAll(); if (unreplicated_merger) @@ -1324,7 +1324,7 @@ void StorageReplicatedMergeTree::restartingThread() startup(); } - std::this_thread::sleep_for(std::chrono::seconds(2)); + permanent_shutdown_event.tryWait(60 * 1000); } } catch (...)