Fix tests

This commit is contained in:
Antonio Andelic 2022-04-19 08:08:13 +00:00
parent cd548aeb30
commit 608c0996d0
4 changed files with 130 additions and 42 deletions

View File

@ -547,6 +547,13 @@ void KeeperDispatcher::updateConfigurationThread()
continue;
}
if (server->isRecovering())
{
LOG_INFO(log, "Server is recovering, will not apply configuration until recovery is finished");
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
continue;
}
ConfigUpdateAction action;
if (!update_configuration_queue.pop(action))
break;
@ -556,6 +563,9 @@ void KeeperDispatcher::updateConfigurationThread()
bool done = false;
while (!done)
{
if (server->isRecovering())
break;
if (shutdown_called)
return;

View File

@ -14,6 +14,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
#include <libnuraft/cluster_config.hxx>
#include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
@ -139,6 +140,20 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
return not_responding_peers <= max_not_responding_peers;
}
// Manually set the internal config of the raft server
// This should be used only for recovery
void setConfig(const nuraft::ptr<nuraft::cluster_config> & new_config)
{
set_config(new_config);
}
// Manually reconfigure the cluster
// This should be used only for recovery
void forceReconfigure(const nuraft::ptr<nuraft::cluster_config> & new_config)
{
reconfigure(new_config);
}
using nuraft::raft_server::raft_server;
// peers are initially marked as responding because at least one cycle
@ -182,11 +197,33 @@ void KeeperServer::loadLatestConfig()
}
}
void KeeperServer::recoveryMode(nuraft::raft_params & params)
{
LOG_WARNING(
log,
"This instance is in recovery mode. Until the quorum is restored, no requests should be sent to any "
"of the cluster instances. This instance will start accepting requests only when the recovery is finished.");
auto latest_config = state_manager->load_config();
nuraft::ptr<nuraft::cluster_config> new_config = std::make_shared<nuraft::cluster_config>(0, latest_config ? latest_config->get_log_idx() : 0);
new_config->set_log_idx(state_manager->load_log_store()->next_slot());
new_config->get_servers() = last_local_config->get_servers();
state_manager->save_config(*new_config);
params.with_custom_commit_quorum_size(1);
params.with_custom_election_quorum_size(1);
}
void KeeperServer::forceRecovery()
{
shutdownRaftServer();
is_recovering = true;
launchRaftServer(true);
std::lock_guard lock{server_mutex};
auto params = raft_instance->get_current_params();
recoveryMode(params);
raft_instance->setConfig(state_manager->load_config());
raft_instance->update_params(params);
}
void KeeperServer::launchRaftServer(bool enable_ipv6)
@ -220,26 +257,14 @@ void KeeperServer::launchRaftServer(bool enable_ipv6)
#if USE_SSL
setSSLParams(asio_opts);
#else
throw Exception{
"SSL support for NuRaft is disabled because ClickHouse was built without SSL support.", ErrorCodes::SUPPORT_IS_DISABLED};
throw Exception(
"SSL support for NuRaft is disabled because ClickHouse was built without SSL support.", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
if (is_recovering)
{
LOG_WARNING(
log,
"This instance is in recovery mode. Until the quorum is restored, no requests should be sent to any "
"of the cluster instances. This instance will start accepting requests only when the recovery is finished.");
params.with_custom_commit_quorum_size(1);
params.with_custom_election_quorum_size(1);
auto latest_config = state_manager->load_config();
auto new_config = std::make_shared<nuraft::cluster_config>(0, latest_config ? latest_config->get_log_idx() : 0);
new_config->get_servers() = last_local_config->get_servers();
new_config->set_log_idx(state_manager->getLogStore()->next_slot());
state_manager->save_config(*new_config);
recoveryMode(params);
}
nuraft::raft_server::init_options init_options;
@ -248,7 +273,7 @@ void KeeperServer::launchRaftServer(bool enable_ipv6)
init_options.start_server_in_constructor_ = false;
init_options.raft_callback_ = [this](nuraft::cb_func::Type type, nuraft::cb_func::Param * param) { return callbackFunc(type, param); };
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", DB::LogsLevel::information);
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
@ -360,6 +385,10 @@ RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForS
for (const auto & [session_id, time, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, time, request));
std::lock_guard lock{server_mutex};
if (is_recovering)
return nullptr;
return raft_instance->append_entries(entries);
}
@ -410,16 +439,32 @@ uint64_t KeeperServer::getSyncedFollowerCount() const
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
if (type == nuraft::cb_func::HeartBeat && is_recovering && raft_instance->isClusterHealthy())
if (is_recovering)
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
if (type == nuraft::cb_func::HeartBeat && raft_instance->isClusterHealthy())
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
return nuraft::cb_func::ReturnCode::Ok;
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
return nuraft::cb_func::ReturnCode::Ok;
}
if (type == nuraft::cb_func::NewConfig)
{
// Apply the manually set config when in recovery mode
// NuRaft will commit but skip the reconfigure if the current
// config is the same as the committed one
// Because we manually set the config to commit
// we need to call the reconfigure also
uint64_t log_idx = *static_cast<uint64_t*>(param->ctx);
if (log_idx == state_manager->load_config()->get_log_idx())
raft_instance->forceReconfigure(state_manager->load_config());
return nuraft::cb_func::ReturnCode::Ok;
}
}
if (initialized_flag)
@ -499,19 +544,26 @@ ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::Abstrac
auto diff = state_manager->getConfigurationDiff(config);
if (!diff.empty())
{
std::lock_guard lock{server_mutex};
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
}
return diff;
}
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
{
std::lock_guard lock{server_mutex};
if (is_recovering)
return;
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
@ -560,7 +612,7 @@ void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
return;
}
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
@ -602,11 +654,15 @@ void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
{
std::lock_guard lock{server_mutex};
if (is_recovering)
return false;
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
@ -628,7 +684,7 @@ bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
{
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{

View File

@ -5,6 +5,7 @@
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperStorage.h>
#include <libnuraft/raft_params.hxx>
#include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
@ -28,6 +29,7 @@ private:
nuraft::ptr<KeeperRaftServer> raft_instance;
nuraft::ptr<nuraft::asio_service> asio_service;
nuraft::ptr<nuraft::rpc_listener> asio_listener;
mutable std::mutex server_mutex;
std::mutex initialized_mutex;
std::atomic<bool> initialized_flag = false;
@ -50,6 +52,8 @@ private:
void loadLatestConfig();
void recoveryMode(nuraft::raft_params & params);
std::atomic_bool is_recovering = false;
public:

View File

@ -29,26 +29,27 @@ def create_and_start_cluster(cluster_size):
quorum_size = get_quorum_size(cluster_size)
nodes = []
for i in range(1, cluster_size + quorum_size + 1):
for i in range(cluster_size):
nodes.append(
cluster.add_instance(
f"node{i}",
f"node{i+1}",
main_configs=[
f"{config_dir}/enable_keeper{i}.xml",
f"{config_dir}/enable_keeper{i+1}.xml",
f"{config_dir}/use_keeper.xml",
],
stay_alive=True,
)
)
for i in range(cluster_size, cluster_size + quorum_size):
nodes.append(
cluster.add_instance(f"node{i+1}", main_configs=[], stay_alive=True)
)
cluster.start()
return cluster, nodes
def smaller_exception(ex):
return "\n".join(str(ex).split("\n")[0:2])
def wait_node(cluster, node):
for _ in range(100):
zk = None
@ -123,8 +124,8 @@ NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving request
@pytest.mark.parametrize("cluster_size", [3, 5])
def test_three_node_recovery(cluster_size):
cluster, nodes = create_and_start_cluster(3)
def test_cluster_recovery(cluster_size):
cluster, nodes = create_and_start_cluster(cluster_size)
quorum_size = get_quorum_size(cluster_size)
node_zks = []
try:
@ -160,6 +161,8 @@ def test_three_node_recovery(cluster_size):
wait_and_assert_data(node_zk, "/test_force_recovery_extra", "somedataextra")
nodes[0].start_clickhouse()
wait_node(cluster, nodes[0])
node_zks[0] = get_fake_zk(cluster, nodes[0].name)
wait_and_assert_data(node_zks[0], "/test_force_recovery_extra", "somedataextra")
# stop last quorum size nodes
@ -199,8 +202,16 @@ def test_three_node_recovery(cluster_size):
)
# add one node to restore the quorum
nodes[cluster_size].copy_file_to_container(
os.path.join(
BASE_DIR,
get_config_dir(cluster_size),
f"enable_keeper{cluster_size+1}.xml",
),
f"/etc/clickhouse-server/config.d/enable_keeper{cluster_size+1}.xml",
)
nodes[cluster_size].start_clickhouse()
wait_node(cluster, nodes[cluster_size])
wait_until_connected(cluster, nodes[cluster_size].name)
# node1 should have quorum now and accept requests
@ -209,9 +220,15 @@ def test_three_node_recovery(cluster_size):
node_zks.append(get_fake_zk(cluster, nodes[cluster_size].name))
# add rest of the nodes
for node in nodes[cluster_size + 1 :]:
for i in range(cluster_size + 1, len(nodes)):
node = nodes[i]
node.copy_file_to_container(
os.path.join(
BASE_DIR, get_config_dir(cluster_size), f"enable_keeper{i+1}.xml"
),
f"/etc/clickhouse-server/config.d/enable_keeper{i+1}.xml",
)
node.start_clickhouse()
wait_node(cluster, node)
wait_until_connected(cluster, node.name)
node_zks.append(get_fake_zk(cluster, node.name))
@ -225,6 +242,7 @@ def test_three_node_recovery(cluster_size):
wait_and_assert_data(node_zks[-1], "/test_force_recovery_last", "somedatalast")
nodes[0].start_clickhouse()
node_zks[0] = get_fake_zk(cluster, nodes[0].name)
for zk in node_zks[:nodes_left]:
assert_all_data(zk)
finally: