From 4f46ac6b30f40939e3004f4ad683a776143b2cb7 Mon Sep 17 00:00:00 2001 From: tavplubix Date: Tue, 7 Dec 2021 19:55:55 +0300 Subject: [PATCH] Remove LeaderElection (#32140) * remove LeaderElection * try fix tests * Update test.py * Update test.py --- src/Storages/MergeTree/LeaderElection.h | 170 ++++++------------ .../PartMovesBetweenShardsOrchestrator.cpp | 1 + .../ReplicatedMergeTreeCleanupThread.cpp | 2 +- .../ReplicatedMergeTreeRestartingThread.cpp | 7 - src/Storages/StorageReplicatedMergeTree.cpp | 60 +++---- src/Storages/StorageReplicatedMergeTree.h | 15 +- .../test_backward_compatibility/test.py | 15 +- .../test.py | 3 + 8 files changed, 90 insertions(+), 183 deletions(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index ccc5fada537..afaf2e7e841 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -1,9 +1,6 @@ #pragma once -#include -#include #include -#include #include #include #include @@ -17,135 +14,74 @@ namespace zkutil * * But then we decided to get rid of leader election, so every replica can become leader. * For now, every replica can become leader if there is no leader among replicas with old version. - * - * It's tempting to remove this class at all, but we have to maintain it, - * to maintain compatibility when replicas with different versions work on the same cluster - * (this is allowed for short time period during cluster update). - * - * Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)". - * If the first node belongs to a replica with new version, then all replicas with new versions become leaders. */ -class LeaderElection + +void checkNoOldLeaders(Poco::Logger * log, ZooKeeper & zookeeper, const String path) { -public: - using LeadershipHandler = std::function; + /// Previous versions (before 21.12) used to create ephemeral sequential node path/leader_election- + /// Replica with the lexicographically smallest node name becomes leader (before 20.6) or enables multi-leader mode (since 20.6) + constexpr auto persistent_multiple_leaders = "leader_election-0"; /// Less than any sequential node + constexpr auto suffix = " (multiple leaders Ok)"; + constexpr auto persistent_identifier = "all (multiple leaders Ok)"; - /** handler is called when this instance become leader. - * - * identifier - if not empty, must uniquely (within same path) identify participant of leader election. - * It means that different participants of leader election have different identifiers - * and existence of more than one ephemeral node with same identifier indicates an error. - */ - LeaderElection( - DB::BackgroundSchedulePool & pool_, - const std::string & path_, - ZooKeeper & zookeeper_, - LeadershipHandler handler_, - const std::string & identifier_) - : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix) - , log_name("LeaderElection (" + path + ")") - , log(&Poco::Logger::get(log_name)) + size_t num_tries = 1000; + while (num_tries--) { - task = pool.createTask(log_name, [this] { threadFunction(); }); - createNode(); - } - - void shutdown() - { - if (shutdown_called) + Strings potential_leaders; + Coordination::Error code = zookeeper.tryGetChildren(path, potential_leaders); + /// NOTE zookeeper_path/leader_election node must exist now, but maybe we will remove it in future versions. + if (code == Coordination::Error::ZNONODE) return; + else if (code != Coordination::Error::ZOK) + throw KeeperException(code, path); - shutdown_called = true; - task->deactivate(); - } + Coordination::Requests ops; - ~LeaderElection() - { - releaseNode(); - } - -private: - static inline constexpr auto suffix = " (multiple leaders Ok)"; - DB::BackgroundSchedulePool & pool; - DB::BackgroundSchedulePool::TaskHolder task; - std::string path; - ZooKeeper & zookeeper; - LeadershipHandler handler; - std::string identifier; - std::string log_name; - Poco::Logger * log; - - EphemeralNodeHolderPtr node; - std::string node_name; - - std::atomic shutdown_called {false}; - - void createNode() - { - shutdown_called = false; - node = EphemeralNodeHolder::createSequential(fs::path(path) / "leader_election-", zookeeper, identifier); - - std::string node_path = node->getPath(); - node_name = node_path.substr(node_path.find_last_of('/') + 1); - - task->activateAndSchedule(); - } - - void releaseNode() - { - shutdown(); - node = nullptr; - } - - void threadFunction() - { - bool success = false; - - try + if (potential_leaders.empty()) { - Strings children = zookeeper.getChildren(path); - std::sort(children.begin(), children.end()); - - auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name); - if (my_node_it == children.end() || *my_node_it != node_name) - throw Poco::Exception("Assertion failed in LeaderElection"); - - String value = zookeeper.get(path + "/" + children.front()); - - if (value.ends_with(suffix)) - { - handler(); + /// Ensure that no leaders appeared and enable persistent multi-leader mode + /// May fail with ZNOTEMPTY + ops.emplace_back(makeRemoveRequest(path, 0)); + ops.emplace_back(makeCreateRequest(path, "", zkutil::CreateMode::Persistent)); + /// May fail with ZNODEEXISTS + ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent)); + } + else + { + if (potential_leaders.front() == persistent_multiple_leaders) return; + + /// Ensure that current leader supports multi-leader mode and make it persistent + auto current_leader = fs::path(path) / potential_leaders.front(); + Coordination::Stat leader_stat; + String identifier; + if (!zookeeper.tryGet(current_leader, identifier, &leader_stat)) + { + LOG_INFO(log, "LeaderElection: leader suddenly changed, will retry"); + continue; } - if (my_node_it == children.begin()) - throw Poco::Exception("Assertion failed in LeaderElection"); + if (!identifier.ends_with(suffix)) + throw Poco::Exception(fmt::format("Found leader replica ({}) with too old version (< 20.6). Stop it before upgrading", identifier)); - /// Watch for the node in front of us. - --my_node_it; - std::string get_path_value; - if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback())) - task->schedule(); - - success = true; - } - catch (const KeeperException & e) - { - DB::tryLogCurrentException(log); - - if (e.code == Coordination::Error::ZSESSIONEXPIRED) - return; - } - catch (...) - { - DB::tryLogCurrentException(log); + /// Version does not matter, just check that it still exists. + /// May fail with ZNONODE + ops.emplace_back(makeCheckRequest(current_leader, leader_stat.version)); + /// May fail with ZNODEEXISTS + ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent)); } - if (!success) - task->scheduleAfter(10 * 1000); + Coordination::Responses res; + code = zookeeper.tryMulti(ops, res); + if (code == Coordination::Error::ZOK) + return; + else if (code == Coordination::Error::ZNOTEMPTY || code == Coordination::Error::ZNODEEXISTS || code == Coordination::Error::ZNONODE) + LOG_INFO(log, "LeaderElection: leader suddenly changed or new node appeared, will retry"); + else + KeeperMultiException::check(code, ops, res); } -}; -using LeaderElectionPtr = std::shared_ptr; + throw Poco::Exception("Cannot check that no old leaders exist"); +} } diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index b3a17250549..4d18adc1dfc 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 5731092f2a8..ff37a341205 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -2,9 +2,9 @@ #include #include #include +#include #include -#include #include diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 3bb592dcdcb..0cc6955ff72 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -197,11 +197,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() updateQuorumIfWeHavePart(); - if (storage_settings->replicated_can_become_leader) - storage.enterLeaderElection(); - else - LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); - /// Anything above can throw a KeeperException if something is wrong with ZK. /// Anything below should not throw exceptions. @@ -380,8 +375,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() LOG_TRACE(log, "Waiting for threads to finish"); - storage.exitLeaderElection(); - storage.queue_updating_task->deactivate(); storage.mutations_updating_task->deactivate(); storage.mutations_finalizing_task->deactivate(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7f600fc054c..852e2b10e6c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include @@ -3400,53 +3401,29 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n } -void StorageReplicatedMergeTree::enterLeaderElection() +void StorageReplicatedMergeTree::startBeingLeader() { - auto callback = [this]() + if (!getSettings()->replicated_can_become_leader) { - LOG_INFO(log, "Became leader"); - - is_leader = true; - merge_selecting_task->activateAndSchedule(); - }; - - try - { - leader_election = std::make_shared( - getContext()->getSchedulePool(), - fs::path(zookeeper_path) / "leader_election", - *current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, - /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. - callback, - replica_name); - } - catch (...) - { - leader_election = nullptr; - throw; + LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0"); + return; } + + zkutil::checkNoOldLeaders(log, *current_zookeeper, fs::path(zookeeper_path) / "leader_election"); + + LOG_INFO(log, "Became leader"); + is_leader = true; + merge_selecting_task->activateAndSchedule(); } -void StorageReplicatedMergeTree::exitLeaderElection() +void StorageReplicatedMergeTree::stopBeingLeader() { - if (!leader_election) + if (!is_leader) return; - /// Shut down the leader election thread to avoid suddenly becoming the leader again after - /// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object. - leader_election->shutdown(); - - if (is_leader) - { - LOG_INFO(log, "Stopped being leader"); - - is_leader = false; - merge_selecting_task->deactivate(); - } - - /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one - /// replica assigns merges at any given time. - leader_election = nullptr; + LOG_INFO(log, "Stopped being leader"); + is_leader = false; + merge_selecting_task->deactivate(); } ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context) @@ -4109,10 +4086,12 @@ void StorageReplicatedMergeTree::startup() assert(prev_ptr == nullptr); getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr); + startBeingLeader(); + /// In this thread replica will be activated. restarting_thread.start(); - /// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attempt to do it + /// Wait while restarting_thread finishing initialization startup_event.wait(); startBackgroundMovesIfNeeded(); @@ -4145,6 +4124,7 @@ void StorageReplicatedMergeTree::shutdown() fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); + stopBeingLeader(); restarting_thread.shutdown(); background_operations_assignee.finish(); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 65daf82a633..bcd364df30e 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -320,7 +319,6 @@ private: * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; - zkutil::LeaderElectionPtr leader_election; InterserverIOEndpointPtr data_parts_exchange_endpoint; @@ -514,15 +512,10 @@ private: bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry); - /// Postcondition: - /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) - /// or an exception is thrown and leader_election is destroyed. - void enterLeaderElection(); - - /// Postcondition: - /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. - /// leader_election node in ZK is either deleted, or the session is marked expired. - void exitLeaderElection(); + /// Start being leader (if not disabled by setting). + /// Since multi-leaders are allowed, it just sets is_leader flag. + void startBeingLeader(); + void stopBeingLeader(); /** Selects the parts to merge and writes to the log. */ diff --git a/tests/integration/test_backward_compatibility/test.py b/tests/integration/test_backward_compatibility/test.py index 71aedb78e5b..a8f4968956c 100644 --- a/tests/integration/test_backward_compatibility/test.py +++ b/tests/integration/test_backward_compatibility/test.py @@ -11,13 +11,14 @@ node2 = cluster.add_instance('node2', main_configs=['configs/wide_parts_only.xml def start_cluster(): try: cluster.start() - for i, node in enumerate([node1, node2]): - node.query_with_retry( - '''CREATE TABLE t(date Date, id UInt32) - ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}') - PARTITION BY toYYYYMM(date) - ORDER BY id'''.format(i)) - + create_query = '''CREATE TABLE t(date Date, id UInt32) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}') + PARTITION BY toYYYYMM(date) + ORDER BY id''' + node1.query(create_query.format(1)) + node1.query("DETACH TABLE t") # stop being leader + node2.query(create_query.format(2)) + node1.query("ATTACH TABLE t") yield cluster finally: diff --git a/tests/integration/test_version_update_after_mutation/test.py b/tests/integration/test_version_update_after_mutation/test.py index 8d38234ccdd..3c22f2ed380 100644 --- a/tests/integration/test_version_update_after_mutation/test.py +++ b/tests/integration/test_version_update_after_mutation/test.py @@ -36,6 +36,8 @@ def test_mutate_and_upgrade(start_cluster): node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"}) node2.query("SYSTEM SYNC REPLICA mt", timeout=15) + node2.query("DETACH TABLE mt") # stop being leader + node1.query("DETACH TABLE mt") # stop being leader node1.restart_with_latest_version(signal=9) node2.restart_with_latest_version(signal=9) @@ -83,6 +85,7 @@ def test_upgrade_while_mutation(start_cluster): node3.query("SYSTEM STOP MERGES mt1") node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0") + node3.query("DETACH TABLE mt1") # stop being leader node3.restart_with_latest_version(signal=9) # checks for readonly