Remove LeaderElection (#32140)

* remove LeaderElection

* try fix tests

* Update test.py

* Update test.py
This commit is contained in:
tavplubix 2021-12-07 19:55:55 +03:00 committed by GitHub
parent 514120adfe
commit 4f46ac6b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 90 additions and 183 deletions

View File

@ -1,9 +1,6 @@
#pragma once
#include <functional>
#include <memory>
#include <base/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/BackgroundSchedulePool.h>
@ -17,135 +14,74 @@ namespace zkutil
*
* But then we decided to get rid of leader election, so every replica can become leader.
* For now, every replica can become leader if there is no leader among replicas with old version.
*
* It's tempting to remove this class at all, but we have to maintain it,
* to maintain compatibility when replicas with different versions work on the same cluster
* (this is allowed for short time period during cluster update).
*
* Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)".
* If the first node belongs to a replica with new version, then all replicas with new versions become leaders.
*/
class LeaderElection
void checkNoOldLeaders(Poco::Logger * log, ZooKeeper & zookeeper, const String path)
{
public:
using LeadershipHandler = std::function<void()>;
/// Previous versions (before 21.12) used to create ephemeral sequential node path/leader_election-
/// Replica with the lexicographically smallest node name becomes leader (before 20.6) or enables multi-leader mode (since 20.6)
constexpr auto persistent_multiple_leaders = "leader_election-0"; /// Less than any sequential node
constexpr auto suffix = " (multiple leaders Ok)";
constexpr auto persistent_identifier = "all (multiple leaders Ok)";
/** handler is called when this instance become leader.
*
* identifier - if not empty, must uniquely (within same path) identify participant of leader election.
* It means that different participants of leader election have different identifiers
* and existence of more than one ephemeral node with same identifier indicates an error.
*/
LeaderElection(
DB::BackgroundSchedulePool & pool_,
const std::string & path_,
ZooKeeper & zookeeper_,
LeadershipHandler handler_,
const std::string & identifier_)
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix)
, log_name("LeaderElection (" + path + ")")
, log(&Poco::Logger::get(log_name))
size_t num_tries = 1000;
while (num_tries--)
{
task = pool.createTask(log_name, [this] { threadFunction(); });
createNode();
}
void shutdown()
{
if (shutdown_called)
Strings potential_leaders;
Coordination::Error code = zookeeper.tryGetChildren(path, potential_leaders);
/// NOTE zookeeper_path/leader_election node must exist now, but maybe we will remove it in future versions.
if (code == Coordination::Error::ZNONODE)
return;
else if (code != Coordination::Error::ZOK)
throw KeeperException(code, path);
shutdown_called = true;
task->deactivate();
}
Coordination::Requests ops;
~LeaderElection()
{
releaseNode();
}
private:
static inline constexpr auto suffix = " (multiple leaders Ok)";
DB::BackgroundSchedulePool & pool;
DB::BackgroundSchedulePool::TaskHolder task;
std::string path;
ZooKeeper & zookeeper;
LeadershipHandler handler;
std::string identifier;
std::string log_name;
Poco::Logger * log;
EphemeralNodeHolderPtr node;
std::string node_name;
std::atomic<bool> shutdown_called {false};
void createNode()
{
shutdown_called = false;
node = EphemeralNodeHolder::createSequential(fs::path(path) / "leader_election-", zookeeper, identifier);
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
task->activateAndSchedule();
}
void releaseNode()
{
shutdown();
node = nullptr;
}
void threadFunction()
{
bool success = false;
try
if (potential_leaders.empty())
{
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name);
if (my_node_it == children.end() || *my_node_it != node_name)
throw Poco::Exception("Assertion failed in LeaderElection");
String value = zookeeper.get(path + "/" + children.front());
if (value.ends_with(suffix))
{
handler();
/// Ensure that no leaders appeared and enable persistent multi-leader mode
/// May fail with ZNOTEMPTY
ops.emplace_back(makeRemoveRequest(path, 0));
ops.emplace_back(makeCreateRequest(path, "", zkutil::CreateMode::Persistent));
/// May fail with ZNODEEXISTS
ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent));
}
else
{
if (potential_leaders.front() == persistent_multiple_leaders)
return;
/// Ensure that current leader supports multi-leader mode and make it persistent
auto current_leader = fs::path(path) / potential_leaders.front();
Coordination::Stat leader_stat;
String identifier;
if (!zookeeper.tryGet(current_leader, identifier, &leader_stat))
{
LOG_INFO(log, "LeaderElection: leader suddenly changed, will retry");
continue;
}
if (my_node_it == children.begin())
throw Poco::Exception("Assertion failed in LeaderElection");
if (!identifier.ends_with(suffix))
throw Poco::Exception(fmt::format("Found leader replica ({}) with too old version (< 20.6). Stop it before upgrading", identifier));
/// Watch for the node in front of us.
--my_node_it;
std::string get_path_value;
if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback()))
task->schedule();
success = true;
}
catch (const KeeperException & e)
{
DB::tryLogCurrentException(log);
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return;
}
catch (...)
{
DB::tryLogCurrentException(log);
/// Version does not matter, just check that it still exists.
/// May fail with ZNONODE
ops.emplace_back(makeCheckRequest(current_leader, leader_stat.version));
/// May fail with ZNODEEXISTS
ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent));
}
if (!success)
task->scheduleAfter(10 * 1000);
Coordination::Responses res;
code = zookeeper.tryMulti(ops, res);
if (code == Coordination::Error::ZOK)
return;
else if (code == Coordination::Error::ZNOTEMPTY || code == Coordination::Error::ZNODEEXISTS || code == Coordination::Error::ZNONODE)
LOG_INFO(log, "LeaderElection: leader suddenly changed or new node appeared, will retry");
else
KeeperMultiException::check(code, ops, res);
}
};
using LeaderElectionPtr = std::shared_ptr<LeaderElection>;
throw Poco::Exception("Cannot check that no old leaders exist");
}
}

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>

View File

@ -2,9 +2,9 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <random>
#include <pcg_random.hpp>
#include <unordered_set>

View File

@ -197,11 +197,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart();
if (storage_settings->replicated_can_become_leader)
storage.enterLeaderElection();
else
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
/// Anything above can throw a KeeperException if something is wrong with ZK.
/// Anything below should not throw exceptions.
@ -380,8 +375,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
LOG_TRACE(log, "Waiting for threads to finish");
storage.exitLeaderElection();
storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate();
storage.mutations_finalizing_task->deactivate();

View File

@ -32,6 +32,7 @@
#include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Databases/IDatabase.h>
@ -3400,53 +3401,29 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
}
void StorageReplicatedMergeTree::enterLeaderElection()
void StorageReplicatedMergeTree::startBeingLeader()
{
auto callback = [this]()
if (!getSettings()->replicated_can_become_leader)
{
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_task->activateAndSchedule();
};
try
{
leader_election = std::make_shared<zkutil::LeaderElection>(
getContext()->getSchedulePool(),
fs::path(zookeeper_path) / "leader_election",
*current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
callback,
replica_name);
}
catch (...)
{
leader_election = nullptr;
throw;
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
return;
}
zkutil::checkNoOldLeaders(log, *current_zookeeper, fs::path(zookeeper_path) / "leader_election");
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_task->activateAndSchedule();
}
void StorageReplicatedMergeTree::exitLeaderElection()
void StorageReplicatedMergeTree::stopBeingLeader()
{
if (!leader_election)
if (!is_leader)
return;
/// Shut down the leader election thread to avoid suddenly becoming the leader again after
/// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object.
leader_election->shutdown();
if (is_leader)
{
LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_task->deactivate();
}
/// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
/// replica assigns merges at any given time.
leader_election = nullptr;
LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_task->deactivate();
}
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context)
@ -4109,10 +4086,12 @@ void StorageReplicatedMergeTree::startup()
assert(prev_ptr == nullptr);
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
startBeingLeader();
/// In this thread replica will be activated.
restarting_thread.start();
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attempt to do it
/// Wait while restarting_thread finishing initialization
startup_event.wait();
startBackgroundMovesIfNeeded();
@ -4145,6 +4124,7 @@ void StorageReplicatedMergeTree::shutdown()
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
stopBeingLeader();
restarting_thread.shutdown();
background_operations_assignee.finish();

View File

@ -19,7 +19,6 @@
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
@ -320,7 +319,6 @@ private:
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
*/
std::atomic<bool> is_leader {false};
zkutil::LeaderElectionPtr leader_election;
InterserverIOEndpointPtr data_parts_exchange_endpoint;
@ -514,15 +512,10 @@ private:
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.
void enterLeaderElection();
/// Postcondition:
/// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr.
/// leader_election node in ZK is either deleted, or the session is marked expired.
void exitLeaderElection();
/// Start being leader (if not disabled by setting).
/// Since multi-leaders are allowed, it just sets is_leader flag.
void startBeingLeader();
void stopBeingLeader();
/** Selects the parts to merge and writes to the log.
*/

View File

@ -11,13 +11,14 @@ node2 = cluster.add_instance('node2', main_configs=['configs/wide_parts_only.xml
def start_cluster():
try:
cluster.start()
for i, node in enumerate([node1, node2]):
node.query_with_retry(
'''CREATE TABLE t(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}')
PARTITION BY toYYYYMM(date)
ORDER BY id'''.format(i))
create_query = '''CREATE TABLE t(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}')
PARTITION BY toYYYYMM(date)
ORDER BY id'''
node1.query(create_query.format(1))
node1.query("DETACH TABLE t") # stop being leader
node2.query(create_query.format(2))
node1.query("ATTACH TABLE t")
yield cluster
finally:

View File

@ -36,6 +36,8 @@ def test_mutate_and_upgrade(start_cluster):
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
node2.query("SYSTEM SYNC REPLICA mt", timeout=15)
node2.query("DETACH TABLE mt") # stop being leader
node1.query("DETACH TABLE mt") # stop being leader
node1.restart_with_latest_version(signal=9)
node2.restart_with_latest_version(signal=9)
@ -83,6 +85,7 @@ def test_upgrade_while_mutation(start_cluster):
node3.query("SYSTEM STOP MERGES mt1")
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
node3.query("DETACH TABLE mt1") # stop being leader
node3.restart_with_latest_version(signal=9)
# checks for readonly