mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge branch 'master' into mutation-livelock
This commit is contained in:
commit
f0d6fe53c9
@ -1,9 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
@ -17,135 +14,74 @@ namespace zkutil
|
||||
*
|
||||
* But then we decided to get rid of leader election, so every replica can become leader.
|
||||
* For now, every replica can become leader if there is no leader among replicas with old version.
|
||||
*
|
||||
* It's tempting to remove this class at all, but we have to maintain it,
|
||||
* to maintain compatibility when replicas with different versions work on the same cluster
|
||||
* (this is allowed for short time period during cluster update).
|
||||
*
|
||||
* Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)".
|
||||
* If the first node belongs to a replica with new version, then all replicas with new versions become leaders.
|
||||
*/
|
||||
class LeaderElection
|
||||
|
||||
void checkNoOldLeaders(Poco::Logger * log, ZooKeeper & zookeeper, const String path)
|
||||
{
|
||||
public:
|
||||
using LeadershipHandler = std::function<void()>;
|
||||
/// Previous versions (before 21.12) used to create ephemeral sequential node path/leader_election-
|
||||
/// Replica with the lexicographically smallest node name becomes leader (before 20.6) or enables multi-leader mode (since 20.6)
|
||||
constexpr auto persistent_multiple_leaders = "leader_election-0"; /// Less than any sequential node
|
||||
constexpr auto suffix = " (multiple leaders Ok)";
|
||||
constexpr auto persistent_identifier = "all (multiple leaders Ok)";
|
||||
|
||||
/** handler is called when this instance become leader.
|
||||
*
|
||||
* identifier - if not empty, must uniquely (within same path) identify participant of leader election.
|
||||
* It means that different participants of leader election have different identifiers
|
||||
* and existence of more than one ephemeral node with same identifier indicates an error.
|
||||
*/
|
||||
LeaderElection(
|
||||
DB::BackgroundSchedulePool & pool_,
|
||||
const std::string & path_,
|
||||
ZooKeeper & zookeeper_,
|
||||
LeadershipHandler handler_,
|
||||
const std::string & identifier_)
|
||||
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix)
|
||||
, log_name("LeaderElection (" + path + ")")
|
||||
, log(&Poco::Logger::get(log_name))
|
||||
size_t num_tries = 1000;
|
||||
while (num_tries--)
|
||||
{
|
||||
task = pool.createTask(log_name, [this] { threadFunction(); });
|
||||
createNode();
|
||||
}
|
||||
|
||||
void shutdown()
|
||||
{
|
||||
if (shutdown_called)
|
||||
Strings potential_leaders;
|
||||
Coordination::Error code = zookeeper.tryGetChildren(path, potential_leaders);
|
||||
/// NOTE zookeeper_path/leader_election node must exist now, but maybe we will remove it in future versions.
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
return;
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
throw KeeperException(code, path);
|
||||
|
||||
shutdown_called = true;
|
||||
task->deactivate();
|
||||
}
|
||||
Coordination::Requests ops;
|
||||
|
||||
~LeaderElection()
|
||||
{
|
||||
releaseNode();
|
||||
}
|
||||
|
||||
private:
|
||||
static inline constexpr auto suffix = " (multiple leaders Ok)";
|
||||
DB::BackgroundSchedulePool & pool;
|
||||
DB::BackgroundSchedulePool::TaskHolder task;
|
||||
std::string path;
|
||||
ZooKeeper & zookeeper;
|
||||
LeadershipHandler handler;
|
||||
std::string identifier;
|
||||
std::string log_name;
|
||||
Poco::Logger * log;
|
||||
|
||||
EphemeralNodeHolderPtr node;
|
||||
std::string node_name;
|
||||
|
||||
std::atomic<bool> shutdown_called {false};
|
||||
|
||||
void createNode()
|
||||
{
|
||||
shutdown_called = false;
|
||||
node = EphemeralNodeHolder::createSequential(fs::path(path) / "leader_election-", zookeeper, identifier);
|
||||
|
||||
std::string node_path = node->getPath();
|
||||
node_name = node_path.substr(node_path.find_last_of('/') + 1);
|
||||
|
||||
task->activateAndSchedule();
|
||||
}
|
||||
|
||||
void releaseNode()
|
||||
{
|
||||
shutdown();
|
||||
node = nullptr;
|
||||
}
|
||||
|
||||
void threadFunction()
|
||||
{
|
||||
bool success = false;
|
||||
|
||||
try
|
||||
if (potential_leaders.empty())
|
||||
{
|
||||
Strings children = zookeeper.getChildren(path);
|
||||
std::sort(children.begin(), children.end());
|
||||
|
||||
auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name);
|
||||
if (my_node_it == children.end() || *my_node_it != node_name)
|
||||
throw Poco::Exception("Assertion failed in LeaderElection");
|
||||
|
||||
String value = zookeeper.get(path + "/" + children.front());
|
||||
|
||||
if (value.ends_with(suffix))
|
||||
{
|
||||
handler();
|
||||
/// Ensure that no leaders appeared and enable persistent multi-leader mode
|
||||
/// May fail with ZNOTEMPTY
|
||||
ops.emplace_back(makeRemoveRequest(path, 0));
|
||||
ops.emplace_back(makeCreateRequest(path, "", zkutil::CreateMode::Persistent));
|
||||
/// May fail with ZNODEEXISTS
|
||||
ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (potential_leaders.front() == persistent_multiple_leaders)
|
||||
return;
|
||||
|
||||
/// Ensure that current leader supports multi-leader mode and make it persistent
|
||||
auto current_leader = fs::path(path) / potential_leaders.front();
|
||||
Coordination::Stat leader_stat;
|
||||
String identifier;
|
||||
if (!zookeeper.tryGet(current_leader, identifier, &leader_stat))
|
||||
{
|
||||
LOG_INFO(log, "LeaderElection: leader suddenly changed, will retry");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (my_node_it == children.begin())
|
||||
throw Poco::Exception("Assertion failed in LeaderElection");
|
||||
if (!identifier.ends_with(suffix))
|
||||
throw Poco::Exception(fmt::format("Found leader replica ({}) with too old version (< 20.6). Stop it before upgrading", identifier));
|
||||
|
||||
/// Watch for the node in front of us.
|
||||
--my_node_it;
|
||||
std::string get_path_value;
|
||||
if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback()))
|
||||
task->schedule();
|
||||
|
||||
success = true;
|
||||
}
|
||||
catch (const KeeperException & e)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
|
||||
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
|
||||
return;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
/// Version does not matter, just check that it still exists.
|
||||
/// May fail with ZNONODE
|
||||
ops.emplace_back(makeCheckRequest(current_leader, leader_stat.version));
|
||||
/// May fail with ZNODEEXISTS
|
||||
ops.emplace_back(makeCreateRequest(fs::path(path) / persistent_multiple_leaders, persistent_identifier, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
|
||||
if (!success)
|
||||
task->scheduleAfter(10 * 1000);
|
||||
Coordination::Responses res;
|
||||
code = zookeeper.tryMulti(ops, res);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return;
|
||||
else if (code == Coordination::Error::ZNOTEMPTY || code == Coordination::Error::ZNODEEXISTS || code == Coordination::Error::ZNONODE)
|
||||
LOG_INFO(log, "LeaderElection: leader suddenly changed or new node appeared, will retry");
|
||||
else
|
||||
KeeperMultiException::check(code, ops, res);
|
||||
}
|
||||
};
|
||||
|
||||
using LeaderElectionPtr = std::shared_ptr<LeaderElection>;
|
||||
throw Poco::Exception("Cannot check that no old leaders exist");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
|
||||
#include <Storages/MergeTree/PinnedPartUUIDs.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
@ -2,9 +2,9 @@
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Poco/Timestamp.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
|
||||
#include <random>
|
||||
#include <pcg_random.hpp>
|
||||
#include <unordered_set>
|
||||
|
||||
|
||||
|
@ -197,11 +197,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
|
||||
updateQuorumIfWeHavePart();
|
||||
|
||||
if (storage_settings->replicated_can_become_leader)
|
||||
storage.enterLeaderElection();
|
||||
else
|
||||
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
|
||||
|
||||
/// Anything above can throw a KeeperException if something is wrong with ZK.
|
||||
/// Anything below should not throw exceptions.
|
||||
|
||||
@ -380,8 +375,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
||||
|
||||
LOG_TRACE(log, "Waiting for threads to finish");
|
||||
|
||||
storage.exitLeaderElection();
|
||||
|
||||
storage.queue_updating_task->deactivate();
|
||||
storage.mutations_updating_task->deactivate();
|
||||
storage.mutations_finalizing_task->deactivate();
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <Storages/MergeTree/MutateFromLogEntryTask.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
|
||||
#include <Storages/MergeTree/LeaderElection.h>
|
||||
|
||||
|
||||
#include <Databases/IDatabase.h>
|
||||
@ -3400,53 +3401,29 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::enterLeaderElection()
|
||||
void StorageReplicatedMergeTree::startBeingLeader()
|
||||
{
|
||||
auto callback = [this]()
|
||||
if (!getSettings()->replicated_can_become_leader)
|
||||
{
|
||||
LOG_INFO(log, "Became leader");
|
||||
|
||||
is_leader = true;
|
||||
merge_selecting_task->activateAndSchedule();
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
leader_election = std::make_shared<zkutil::LeaderElection>(
|
||||
getContext()->getSchedulePool(),
|
||||
fs::path(zookeeper_path) / "leader_election",
|
||||
*current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
|
||||
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
|
||||
callback,
|
||||
replica_name);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
leader_election = nullptr;
|
||||
throw;
|
||||
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
|
||||
return;
|
||||
}
|
||||
|
||||
zkutil::checkNoOldLeaders(log, *current_zookeeper, fs::path(zookeeper_path) / "leader_election");
|
||||
|
||||
LOG_INFO(log, "Became leader");
|
||||
is_leader = true;
|
||||
merge_selecting_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::exitLeaderElection()
|
||||
void StorageReplicatedMergeTree::stopBeingLeader()
|
||||
{
|
||||
if (!leader_election)
|
||||
if (!is_leader)
|
||||
return;
|
||||
|
||||
/// Shut down the leader election thread to avoid suddenly becoming the leader again after
|
||||
/// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object.
|
||||
leader_election->shutdown();
|
||||
|
||||
if (is_leader)
|
||||
{
|
||||
LOG_INFO(log, "Stopped being leader");
|
||||
|
||||
is_leader = false;
|
||||
merge_selecting_task->deactivate();
|
||||
}
|
||||
|
||||
/// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
|
||||
/// replica assigns merges at any given time.
|
||||
leader_election = nullptr;
|
||||
LOG_INFO(log, "Stopped being leader");
|
||||
is_leader = false;
|
||||
merge_selecting_task->deactivate();
|
||||
}
|
||||
|
||||
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context)
|
||||
@ -4109,10 +4086,12 @@ void StorageReplicatedMergeTree::startup()
|
||||
assert(prev_ptr == nullptr);
|
||||
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
|
||||
|
||||
startBeingLeader();
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
|
||||
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attempt to do it
|
||||
/// Wait while restarting_thread finishing initialization
|
||||
startup_event.wait();
|
||||
|
||||
startBackgroundMovesIfNeeded();
|
||||
@ -4145,6 +4124,7 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
fetcher.blocker.cancelForever();
|
||||
merger_mutator.merges_blocker.cancelForever();
|
||||
parts_mover.moves_blocker.cancelForever();
|
||||
stopBeingLeader();
|
||||
|
||||
restarting_thread.shutdown();
|
||||
background_operations_assignee.finish();
|
||||
|
@ -19,7 +19,6 @@
|
||||
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
|
||||
#include <Storages/MergeTree/DataPartsExchange.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
|
||||
#include <Storages/MergeTree/LeaderElection.h>
|
||||
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
|
||||
@ -320,7 +319,6 @@ private:
|
||||
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
|
||||
*/
|
||||
std::atomic<bool> is_leader {false};
|
||||
zkutil::LeaderElectionPtr leader_election;
|
||||
|
||||
InterserverIOEndpointPtr data_parts_exchange_endpoint;
|
||||
|
||||
@ -514,15 +512,10 @@ private:
|
||||
|
||||
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
|
||||
|
||||
/// Postcondition:
|
||||
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
|
||||
/// or an exception is thrown and leader_election is destroyed.
|
||||
void enterLeaderElection();
|
||||
|
||||
/// Postcondition:
|
||||
/// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr.
|
||||
/// leader_election node in ZK is either deleted, or the session is marked expired.
|
||||
void exitLeaderElection();
|
||||
/// Start being leader (if not disabled by setting).
|
||||
/// Since multi-leaders are allowed, it just sets is_leader flag.
|
||||
void startBeingLeader();
|
||||
void stopBeingLeader();
|
||||
|
||||
/** Selects the parts to merge and writes to the log.
|
||||
*/
|
||||
|
@ -11,13 +11,14 @@ node2 = cluster.add_instance('node2', main_configs=['configs/wide_parts_only.xml
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
for i, node in enumerate([node1, node2]):
|
||||
node.query_with_retry(
|
||||
'''CREATE TABLE t(date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}')
|
||||
PARTITION BY toYYYYMM(date)
|
||||
ORDER BY id'''.format(i))
|
||||
|
||||
create_query = '''CREATE TABLE t(date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}')
|
||||
PARTITION BY toYYYYMM(date)
|
||||
ORDER BY id'''
|
||||
node1.query(create_query.format(1))
|
||||
node1.query("DETACH TABLE t") # stop being leader
|
||||
node2.query(create_query.format(2))
|
||||
node1.query("ATTACH TABLE t")
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
|
@ -36,6 +36,8 @@ def test_mutate_and_upgrade(start_cluster):
|
||||
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
|
||||
node2.query("SYSTEM SYNC REPLICA mt", timeout=15)
|
||||
|
||||
node2.query("DETACH TABLE mt") # stop being leader
|
||||
node1.query("DETACH TABLE mt") # stop being leader
|
||||
node1.restart_with_latest_version(signal=9)
|
||||
node2.restart_with_latest_version(signal=9)
|
||||
|
||||
@ -83,6 +85,7 @@ def test_upgrade_while_mutation(start_cluster):
|
||||
node3.query("SYSTEM STOP MERGES mt1")
|
||||
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
|
||||
|
||||
node3.query("DETACH TABLE mt1") # stop being leader
|
||||
node3.restart_with_latest_version(signal=9)
|
||||
|
||||
# checks for readonly
|
||||
|
Loading…
Reference in New Issue
Block a user