Use 1 leader quorum for recovery

This commit is contained in:
Antonio Andelic 2022-04-11 06:41:46 +00:00
parent 0e1ba927bd
commit dbd88a5acb
4 changed files with 25 additions and 20 deletions

View File

@ -278,7 +278,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
try
{
LOG_DEBUG(log, "Waiting server to initialize");
server->startup(configuration_and_settings->enable_ipv6);
server->startup(config, configuration_and_settings->enable_ipv6);
LOG_DEBUG(log, "Server initialized, waiting for quorum");
if (!start_async)

View File

@ -14,6 +14,7 @@
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <string>
#include <filesystem>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <boost/algorithm/string.hpp>
@ -113,23 +114,6 @@ void KeeperServer::loadLatestConfig()
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
if (recover)
{
auto local_cluster_config = state_manager->getLocalConfig();
latest_log_store_config = std::make_shared<nuraft::cluster_config>(0, latest_log_store_config ? latest_log_store_config->get_log_idx() : 0);
latest_log_store_config->get_servers() = local_cluster_config->get_servers();
latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot());
for (auto & server : latest_log_store_config->get_servers())
{
LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx());
}
state_manager->save_config(*latest_log_store_config);
return;
}
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
@ -159,7 +143,7 @@ void KeeperServer::loadLatestConfig()
}
}
void KeeperServer::startup(bool enable_ipv6)
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
state_machine->init();
@ -201,6 +185,24 @@ void KeeperServer::startup(bool enable_ipv6)
launchRaftServer(enable_ipv6, params, asio_opts);
if (recover)
{
auto configuration = state_manager->parseServersConfiguration(config, false);
auto local_cluster_config = configuration.cluster_config;
auto latest_log_store_config = std::make_shared<nuraft::cluster_config>(0, local_cluster_config ? local_cluster_config->get_log_idx() : 0);
latest_log_store_config->get_servers() = local_cluster_config->get_servers();
latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot());
for (auto & server : latest_log_store_config->get_servers())
{
LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx());
}
state_manager->save_config(*latest_log_store_config);
return;
}
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}
@ -242,6 +244,7 @@ void KeeperServer::launchRaftServer(
raft_instance->start_server(init_options.skip_initial_election_timeout_);
asio_listener->listen(raft_instance);
}
void KeeperServer::shutdownRaftServer()

View File

@ -7,6 +7,7 @@
#include <Coordination/KeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
#include <base/logger_useful.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
@ -59,7 +60,7 @@ public:
SnapshotsQueue & snapshots_queue_);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup(bool enable_ipv6 = true);
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);
/// Put local read request and execute in state machine directly and response into
/// responses queue

View File

@ -140,6 +140,7 @@ private:
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_state> server_state;
public:
/// Parse configuration from xml config.
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const;
};