From dbd88a5acb0a0b60378e59b5e18a578957c40341 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 11 Apr 2022 06:41:46 +0000 Subject: [PATCH] Use 1 leader quorum for recovery --- src/Coordination/KeeperDispatcher.cpp | 2 +- src/Coordination/KeeperServer.cpp | 39 ++++++++++++++------------- src/Coordination/KeeperServer.h | 3 ++- src/Coordination/KeeperStateManager.h | 1 + 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 4d71c11221e..65a6fbb287a 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -278,7 +278,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf try { LOG_DEBUG(log, "Waiting server to initialize"); - server->startup(configuration_and_settings->enable_ipv6); + server->startup(config, configuration_and_settings->enable_ipv6); LOG_DEBUG(log, "Server initialized, waiting for quorum"); if (!start_async) diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 13d7f67e697..39df5ff61fa 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -113,23 +114,6 @@ void KeeperServer::loadLatestConfig() auto latest_snapshot_config = state_machine->getClusterConfig(); auto latest_log_store_config = state_manager->getLatestConfigFromLogStore(); - if (recover) - { - auto local_cluster_config = state_manager->getLocalConfig(); - latest_log_store_config = std::make_shared(0, latest_log_store_config ? latest_log_store_config->get_log_idx() : 0); - latest_log_store_config->get_servers() = local_cluster_config->get_servers(); - latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot()); - - for (auto & server : latest_log_store_config->get_servers()) - { - LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx()); - } - - - state_manager->save_config(*latest_log_store_config); - return; - } - if (latest_snapshot_config && latest_log_store_config) { if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx()) @@ -159,7 +143,7 @@ void KeeperServer::loadLatestConfig() } } -void KeeperServer::startup(bool enable_ipv6) +void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6) { state_machine->init(); @@ -201,6 +185,24 @@ void KeeperServer::startup(bool enable_ipv6) launchRaftServer(enable_ipv6, params, asio_opts); + if (recover) + { + auto configuration = state_manager->parseServersConfiguration(config, false); + auto local_cluster_config = configuration.cluster_config; + auto latest_log_store_config = std::make_shared(0, local_cluster_config ? local_cluster_config->get_log_idx() : 0); + latest_log_store_config->get_servers() = local_cluster_config->get_servers(); + latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot()); + + for (auto & server : latest_log_store_config->get_servers()) + { + LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx()); + } + + + state_manager->save_config(*latest_log_store_config); + return; + } + if (!raft_instance) throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance"); } @@ -242,6 +244,7 @@ void KeeperServer::launchRaftServer( raft_instance->start_server(init_options.skip_initial_election_timeout_); asio_listener->listen(raft_instance); + } void KeeperServer::shutdownRaftServer() diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 6aa5e8f3c77..210777a4086 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -59,7 +60,7 @@ public: SnapshotsQueue & snapshots_queue_); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. - void startup(bool enable_ipv6 = true); + void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); /// Put local read request and execute in state machine directly and response into /// responses queue diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index 2beb9670355..f1830ee4b79 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -140,6 +140,7 @@ private: nuraft::ptr log_store; nuraft::ptr server_state; +public: /// Parse configuration from xml config. KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const; };