diff --git a/dbms/src/Common/ZooKeeper/LeaderElection.h b/dbms/src/Common/ZooKeeper/LeaderElection.h index 8dd9b1831b1..1786cc76510 100644 --- a/dbms/src/Common/ZooKeeper/LeaderElection.h +++ b/dbms/src/Common/ZooKeeper/LeaderElection.h @@ -41,10 +41,15 @@ public: createNode(); } - void yield() + void shutdown() { - releaseNode(); - createNode(); + if (shutdown_called) + return; + + shutdown_called = true; + event->set(); + if (thread.joinable()) + thread.join(); } ~LeaderElection() @@ -62,14 +67,14 @@ private: std::string node_name; std::thread thread; - std::atomic shutdown {false}; + std::atomic shutdown_called {false}; zkutil::EventPtr event = std::make_shared(); CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; void createNode() { - shutdown = false; + shutdown_called = false; node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier); std::string node_path = node->getPath(); @@ -80,16 +85,13 @@ private: void releaseNode() { - shutdown = true; - event->set(); - if (thread.joinable()) - thread.join(); + shutdown(); node = nullptr; } void threadFunction() { - while (!shutdown) + while (!shutdown_called) { bool success = false; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index f7dca129bd3..fb3276fcfab 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -52,7 +52,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() /// This is loose condition: no problem if we actually had lost leadership at this moment /// and two replicas will try to do cleanup simultaneously. - if (storage.is_leader_node) + if (storage.is_leader) { clearOldLogs(); clearOldBlocks(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 6b20b5c86c1..1cd958c60d6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -17,7 +17,6 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric ReadonlyReplica; - extern const Metric LeaderReplica; } @@ -139,7 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run() prev_time_of_check_delay = current_time; /// We give up leadership if the relative lag is greater than threshold. - if (storage.is_leader_node + if (storage.is_leader && relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) { LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" @@ -147,11 +146,11 @@ void ReplicatedMergeTreeRestartingThread::run() ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); - storage.is_leader_node = false; - CurrentMetrics::sub(CurrentMetrics::LeaderReplica); - if (storage.merge_selecting_thread.joinable()) - storage.merge_selecting_thread.join(); - storage.leader_election->yield(); + storage.exitLeaderElection(); + /// NOTE: enterLeaderElection() can throw if node creation in ZK fails. + /// This is bad because we can end up without a leader on any replica. + /// In this case we rely on the fact that the session will expire and we will reconnect. + storage.enterLeaderElection(); } } } @@ -169,6 +168,8 @@ void ReplicatedMergeTreeRestartingThread::run() storage.data_parts_exchange_endpoint_holder->cancelForever(); storage.data_parts_exchange_endpoint_holder = nullptr; + /// Cancel fetches and merges to force the queue_task to finish ASAP. + storage.fetcher.blocker.cancelForever(); storage.merger.merges_blocker.cancelForever(); partialShutdown(); @@ -195,12 +196,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() updateQuorumIfWeHavePart(); if (storage.data.settings.replicated_can_become_leader) - storage.leader_election = std::make_shared( - storage.zookeeper_path + "/leader_election", - *storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, - /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. - [this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); }, - storage.replica_name); + storage.enterLeaderElection(); /// Anything above can throw a KeeperException if something is wrong with ZK. /// Anything below should not throw exceptions. @@ -222,7 +218,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() catch (...) { storage.replica_is_active_node = nullptr; - storage.leader_election = nullptr; try { @@ -366,17 +361,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.replica_is_active_node = nullptr; LOG_TRACE(log, "Waiting for threads to finish"); - { - std::lock_guard lock(storage.leader_node_mutex); - bool old_val = true; - if (storage.is_leader_node.compare_exchange_strong(old_val, false)) - { - CurrentMetrics::sub(CurrentMetrics::LeaderReplica); - if (storage.merge_selecting_thread.joinable()) - storage.merge_selecting_thread.join(); - } - } + storage.exitLeaderElection(); + if (storage.queue_updating_thread.joinable()) storage.queue_updating_thread.join(); @@ -384,20 +371,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.alter_thread.reset(); storage.part_check_thread.stop(); - /// Yielding leadership only after finish of merge_selecting_thread. - /// Otherwise race condition with parallel run of merge selecting thread on different servers is possible. - /// - /// On the other hand, leader_election could call becomeLeader() from own thread after - /// merge_selecting_thread is finished and restarting_thread is destroyed. - /// becomeLeader() recreates merge_selecting_thread and it becomes joinable again, even restarting_thread is destroyed. - /// But restarting_thread is responsible to stop merge_selecting_thread. - /// It will lead to std::terminate in ~StorageReplicatedMergeTree(). - /// Such behaviour was rarely observed on DROP queries. - /// Therefore we need either avoid becoming leader after first shutdown call (more deliberate choice), - /// either manually wait merge_selecting_thread.join() inside ~StorageReplicatedMergeTree(), either or something third. - /// So, we added shutdown check in becomeLeader() and made its creation and deletion atomic. - storage.leader_election = nullptr; - LOG_TRACE(log, "Threads finished"); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 39213653478..1548537e390 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -59,6 +59,12 @@ namespace ProfileEvents extern const Event DataAfterMergeDiffersFromReplica; } +namespace CurrentMetrics +{ + extern const Metric LeaderReplica; +} + + namespace DB { @@ -1883,7 +1889,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); }; - while (!shutdown_called && is_leader_node) + while (is_leader) { bool success = false; @@ -1932,7 +1938,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() tryLogCurrentException(__PRETTY_FUNCTION__); } - if (shutdown_called || !is_leader_node) + if (!is_leader) break; if (!success) @@ -2037,23 +2043,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n } -void StorageReplicatedMergeTree::becomeLeader() +void StorageReplicatedMergeTree::enterLeaderElection() { - std::lock_guard lock(leader_node_mutex); + auto callback = [this]() + { + CurrentMetrics::add(CurrentMetrics::LeaderReplica); + LOG_INFO(log, "Became leader"); - if (shutdown_called) + is_leader = true; + merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + }; + + try + { + leader_election = std::make_shared( + zookeeper_path + "/leader_election", + *current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, + /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. + callback, + replica_name); + } + catch (...) + { + leader_election = nullptr; + throw; + } +} + +void StorageReplicatedMergeTree::exitLeaderElection() +{ + if (!leader_election) return; - if (merge_selecting_thread.joinable()) + /// Shut down the leader election thread to avoid suddenly becoming the leader again after + /// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object. + leader_election->shutdown(); + + if (is_leader) { - LOG_INFO(log, "Deleting old leader"); - is_leader_node = false; /// exit trigger inside thread + CurrentMetrics::sub(CurrentMetrics::LeaderReplica); + LOG_INFO(log, "Stopped being leader"); + + is_leader = false; + merge_selecting_event.set(); merge_selecting_thread.join(); } - LOG_INFO(log, "Became leader"); - is_leader_node = true; - merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one + /// replica assigns merges at any given time. + leader_election = nullptr; } @@ -2382,12 +2420,6 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - /** This must be done before waiting for restarting_thread. - * Because restarting_thread will wait for finishing of tasks in background pool, - * and parts are fetched in that tasks. - */ - fetcher.blocker.cancelForever(); - if (restarting_thread) { restarting_thread->stop(); @@ -2399,6 +2431,8 @@ void StorageReplicatedMergeTree::shutdown() data_parts_exchange_endpoint_holder->cancelForever(); data_parts_exchange_endpoint_holder = nullptr; } + + fetcher.blocker.cancelForever(); } @@ -2487,7 +2521,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p { assertNotReadonly(); - if (!is_leader_node) + if (!is_leader) { sendRequestToLeaderReplica(query, context.getSettingsRef()); return true; @@ -2813,7 +2847,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt zkutil::ZooKeeperPtr zookeeper = getZooKeeper(); - if (!is_leader_node) + if (!is_leader) { sendRequestToLeaderReplica(query, context.getSettingsRef()); return; @@ -3171,7 +3205,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto zookeeper = tryGetZooKeeper(); - res.is_leader = is_leader_node; + res.is_leader = is_leader; res.is_readonly = is_readonly; res.is_session_expired = !zookeeper || zookeeper->expired(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index c2b09a77bf1..5d0659f19f5 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -220,8 +220,8 @@ private: /** Is this replica "leading". The leader replica selects the parts to merge. */ - std::atomic_bool is_leader_node {false}; - std::mutex leader_node_mutex; + std::atomic is_leader {false}; + zkutil::LeaderElectionPtr leader_election; InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; @@ -239,7 +239,6 @@ private: DataPartsExchange::Fetcher fetcher; - zkutil::LeaderElectionPtr leader_election; /// When activated, replica is initialized and startup() method could exit Poco::Event startup_event; @@ -368,9 +367,15 @@ private: */ bool queueTask(); - /// Select the parts to merge. + /// Postcondition: + /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) + /// or an exception is thrown and leader_election is destroyed. + void enterLeaderElection(); - void becomeLeader(); + /// Postcondition: + /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. + /// leader_election node in ZK is either deleted, or the session is marked expired. + void exitLeaderElection(); /** Selects the parts to merge and writes to the log. */