Refactoring

This commit is contained in:
Antonio Andelic 2022-04-14 12:00:47 +00:00
parent c968353ee9
commit 272965fc44
4 changed files with 77 additions and 80 deletions

View File

@ -579,6 +579,11 @@ void KeeperDispatcher::updateConfigurationThread()
}
}
bool KeeperDispatcher::isServerActive() const
{
return checkInit() && hasLeader() && !server->isRecovering();
}
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto diff = server->getConfigurationDiff(config);

View File

@ -116,9 +116,11 @@ public:
return server && server->checkInit();
}
bool isServerActive() const
bool isServerActive() const;
bool serverIsRecovering() const
{
return checkInit() && hasLeader() && !server->inRecover();
return server->isRecovering();
}
/// Registered in ConfigReloader callback. Add new configuration changes to
@ -156,11 +158,6 @@ public:
return server->isLeaderAlive();
}
bool inRecover() const
{
return server->inRecover();
}
bool isObserver() const
{
return server->isObserver();

View File

@ -104,12 +104,44 @@ KeeperServer::KeeperServer(
checkAndGetSuperdigest(configuration_and_settings_->super_digest)))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings))
, log(&Poco::Logger::get("KeeperServer"))
, recover(config.has("keeper_server.recover") && config.getBool("keeper_server.recover"))
, is_recovering(config.has("keeper_server.recover") && config.getBool("keeper_server.recover"))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
}
struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
{
bool isClusterHealthy()
{
if (timer_from_init)
{
size_t expiry = get_current_params().heart_beat_interval_ *
raft_server::raft_limits_.response_limit_;
if (timer_from_init->elapsedMilliseconds() < expiry)
return false;
timer_from_init.reset();
}
const size_t voting_members = get_num_voting_members();
const auto not_responding_peers = get_not_responding_peers();
const auto quorum_size = voting_members / 2 + 1;
const auto max_not_responding_peers = voting_members - quorum_size;
return not_responding_peers <= max_not_responding_peers;
}
using nuraft::raft_server::raft_server;
// peers are initially marked as responding because at least one cycle
// of heartbeat * response_limit (20) need to pass to be marked
// as not responding
// until that time passes we can't say that the cluster is healthy
std::optional<Stopwatch> timer_from_init = std::make_optional<Stopwatch>();
};
void KeeperServer::loadLatestConfig()
{
auto latest_snapshot_config = state_machine->getClusterConfig();
@ -147,11 +179,11 @@ void KeeperServer::loadLatestConfig()
void KeeperServer::forceRecovery()
{
shutdownRaftServer();
recover = true;
startupRaftServer(true);
is_recovering = true;
launchRaftServer(true);
}
void KeeperServer::startupRaftServer(bool enable_ipv6)
void KeeperServer::launchRaftServer(bool enable_ipv6)
{
nuraft::raft_params params;
params.heart_beat_interval_ = getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log);
@ -180,45 +212,21 @@ void KeeperServer::startupRaftServer(bool enable_ipv6)
#endif
}
if (recover)
if (is_recovering)
{
LOG_WARNING(log, "This instance was started in recovery mode. Until the quorum is restored, no requests should be sent to any "
LOG_WARNING(log, "This instance is in recovery mode. Until the quorum is restored, no requests should be sent to any "
"of the cluster instances. This instance will start accepting requests only when the recovery is finished.");
params.with_custom_commit_quorum_size(1);
params.with_custom_election_quorum_size(1);
auto latest_config = state_manager->load_config();
auto new_config = std::make_shared<nuraft::cluster_config>(0, latest_config ? latest_config->get_log_idx() : 0);
new_config->get_servers() = last_read_config->get_servers();
new_config->get_servers() = last_local_config->get_servers();
new_config->set_log_idx(state_manager->getLogStore()->next_slot());
state_manager->save_config(*new_config);
}
launchRaftServer(enable_ipv6, params, asio_opts);
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
loadLatestConfig();
last_read_config = state_manager->parseServersConfiguration(config, true).cluster_config;
startupRaftServer(enable_ipv6);
}
void KeeperServer::launchRaftServer(
bool enable_ipv6,
const nuraft::raft_params & params,
const nuraft::asio_service::options & asio_opts)
{
nuraft::raft_server::init_options init_options;
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
@ -250,8 +258,24 @@ void KeeperServer::launchRaftServer(
raft_instance->start_server(state_manager->shouldStartAsFollower());
auto raft_server_ptr = std::static_pointer_cast<nuraft::raft_server>(raft_instance);
asio_listener->listen(raft_server_ptr);
nuraft::ptr<nuraft::raft_server> casted_raft_server = raft_instance;
asio_listener->listen(casted_raft_server);
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
loadLatestConfig();
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
launchRaftServer(enable_ipv6);
}
void KeeperServer::shutdownRaftServer()
@ -376,15 +400,15 @@ uint64_t KeeperServer::getSyncedFollowerCount() const
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
if (type == nuraft::cb_func::HeartBeat && recover && raft_instance->isClusterHealthy())
if (type == nuraft::cb_func::HeartBeat && is_recovering && raft_instance->isClusterHealthy())
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
LOG_INFO(log, "Recovery is done");
recover = false;
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
return nuraft::cb_func::ReturnCode::Ok;
}
@ -465,7 +489,7 @@ ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::Abstrac
auto diff = state_manager->getConfigurationDiff(config);
if (!diff.empty())
last_read_config = state_manager->parseServersConfiguration(config, true).cluster_config;
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
return diff;
}

View File

@ -6,6 +6,7 @@
#include <Coordination/KeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <libnuraft/raft_server.hxx>
namespace DB
{
@ -15,33 +16,6 @@ using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buff
class KeeperServer
{
private:
struct KeeperRaftServer : public nuraft::raft_server
{
bool isClusterHealthy()
{
if (timer_from_init)
{
size_t expiry = get_current_params().heart_beat_interval_ *
raft_server::raft_limits_.response_limit_;
if (timer_from_init->elapsedMilliseconds() < expiry)
return false;
}
const size_t voting_members = get_num_voting_members();
const auto not_responding_peers = get_not_responding_peers();
const auto quorum_size = voting_members / 2 + 1;
const auto max_not_responding_peers = voting_members - quorum_size;
return not_responding_peers <= max_not_responding_peers;
}
using nuraft::raft_server::raft_server;
std::optional<Stopwatch> timer_from_init = std::make_optional<Stopwatch>();
};
const int server_id;
CoordinationSettingsPtr coordination_settings;
@ -50,6 +24,7 @@ private:
nuraft::ptr<KeeperStateManager> state_manager;
struct KeeperRaftServer;
nuraft::ptr<KeeperRaftServer> raft_instance;
nuraft::ptr<nuraft::asio_service> asio_service;
nuraft::ptr<nuraft::rpc_listener> asio_listener;
@ -59,7 +34,7 @@ private:
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
nuraft::ptr<nuraft::cluster_config> last_read_config;
nuraft::ptr<nuraft::cluster_config> last_local_config;
Poco::Logger * log;
@ -67,19 +42,15 @@ private:
/// Used to determine the moment when raft is ready to server new requests
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);
void startupRaftServer(bool enable_ipv6);
/// Almost copy-paste from nuraft::launcher, but with separated server init and start
/// Allows to avoid race conditions.
void launchRaftServer(
bool enable_ipv6,
const nuraft::raft_params & params,
const nuraft::asio_service::options & asio_opts);
void launchRaftServer(bool enable_ipv6);
void shutdownRaftServer();
void loadLatestConfig();
std::atomic_bool recover = false;
std::atomic_bool is_recovering = false;
public:
KeeperServer(
@ -95,9 +66,9 @@ public:
/// responses queue
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
bool inRecover() const
bool isRecovering() const
{
return recover;
return is_recovering;
}
/// Put batch of requests into Raft and get result of put. Responses will be set separately into