Force recover from configuration

This commit is contained in:
Antonio Andelic 2022-04-07 09:25:01 +00:00
parent 8a2e006cc4
commit bb4bc17af1
5 changed files with 44 additions and 6 deletions

View File

@ -102,20 +102,35 @@ KeeperServer::KeeperServer(
checkAndGetSuperdigest(configuration_and_settings_->super_digest)))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings))
, log(&Poco::Logger::get("KeeperServer"))
, recover(config.getBool("recover"))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
}
void KeeperServer::startup(bool enable_ipv6)
void KeeperServer::loadLatestConfig()
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
if (recover)
{
auto local_cluster_config = state_manager->getLocalConfig();
latest_log_store_config = std::make_shared<nuraft::cluster_config>(0, latest_log_store_config ? latest_log_store_config->get_log_idx() : 0);
latest_log_store_config->get_servers() = local_cluster_config->get_servers();
latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot());
for (auto & server : latest_log_store_config->get_servers())
{
LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx());
}
state_manager->save_config(*latest_log_store_config);
state_machine->commit_config(latest_log_store_config->get_log_idx(), latest_log_store_config);
return;
}
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
@ -143,6 +158,13 @@ void KeeperServer::startup(bool enable_ipv6)
{
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
}
}
void KeeperServer::startup(bool enable_ipv6)
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
nuraft::raft_params params;
params.heart_beat_interval_ = getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log);
@ -204,6 +226,8 @@ void KeeperServer::launchRaftServer(
nuraft::ptr<nuraft::state_mgr> casted_state_manager = state_manager;
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
loadLatestConfig();
/// raft_server creates unique_ptr from it
nuraft::context * ctx = new nuraft::context(
casted_state_manager, casted_state_machine,

View File

@ -48,6 +48,9 @@ private:
void shutdownRaftServer();
void loadLatestConfig();
bool recover = false;
public:
KeeperServer(
const KeeperConfigurationAndSettingsPtr & settings_,

View File

@ -175,11 +175,12 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
}
void KeeperStateMachine::commit_config(const uint64_t /*log_idx*/, nuraft::ptr<nuraft::cluster_config> & new_conf)
void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf)
{
std::lock_guard lock(cluster_config_lock);
auto tmp = new_conf->serialize();
cluster_config = ClusterConfig::deserialize(*tmp);
last_committed_idx = log_idx;
}
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()

View File

@ -248,6 +248,7 @@ void KeeperStateManager::flushLogStore()
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
{
LOG_INFO(&Poco::Logger::get("State Manager"), "Save config called");
std::lock_guard lock(configuration_wrapper_mutex);
nuraft::ptr<nuraft::buffer> buf = config.serialize();
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);

View File

@ -6,6 +6,7 @@
#include <Coordination/CoordinationSettings.h>
#include <libnuraft/nuraft.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include "Coordination/KeeperStateMachine.h"
#include <Coordination/KeeperSnapshotManager.h>
namespace DB
@ -103,9 +104,17 @@ public:
/// Read all log entries in log store from the begging and return latest config (with largest log_index)
ClusterConfigPtr getLatestConfigFromLogStore() const;
ClusterConfigPtr getLocalConfig() const
{
std::lock_guard lock{configuration_wrapper_mutex};
return configuration_wrapper.cluster_config;
}
/// Get configuration diff between proposed XML and current state in RAFT
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
void applyConfigsToLogEntry();
private:
/// Wrapper struct for Keeper cluster config. We parse this
/// info from XML files.