diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 30db486dd1b..25d985a7ea3 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -102,20 +102,35 @@ KeeperServer::KeeperServer( checkAndGetSuperdigest(configuration_and_settings_->super_digest))) , state_manager(nuraft::cs_new(server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings)) , log(&Poco::Logger::get("KeeperServer")) + , recover(config.getBool("recover")) { if (coordination_settings->quorum_reads) LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower."); } -void KeeperServer::startup(bool enable_ipv6) +void KeeperServer::loadLatestConfig() { - state_machine->init(); - - state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items); - auto latest_snapshot_config = state_machine->getClusterConfig(); auto latest_log_store_config = state_manager->getLatestConfigFromLogStore(); + if (recover) + { + auto local_cluster_config = state_manager->getLocalConfig(); + latest_log_store_config = std::make_shared(0, latest_log_store_config ? latest_log_store_config->get_log_idx() : 0); + latest_log_store_config->get_servers() = local_cluster_config->get_servers(); + latest_log_store_config->set_log_idx(state_manager->getLogStore()->next_slot()); + + for (auto & server : latest_log_store_config->get_servers()) + { + LOG_INFO(log, "Having server {} with log idx {}", server->get_id(), latest_log_store_config->get_log_idx()); + } + + + state_manager->save_config(*latest_log_store_config); + state_machine->commit_config(latest_log_store_config->get_log_idx(), latest_log_store_config); + return; + } + if (latest_snapshot_config && latest_log_store_config) { if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx()) @@ -143,6 +158,13 @@ void KeeperServer::startup(bool enable_ipv6) { LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk"); } +} + +void KeeperServer::startup(bool enable_ipv6) +{ + state_machine->init(); + + state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items); nuraft::raft_params params; params.heart_beat_interval_ = getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log); @@ -204,6 +226,8 @@ void KeeperServer::launchRaftServer( nuraft::ptr casted_state_manager = state_manager; nuraft::ptr casted_state_machine = state_machine; + loadLatestConfig(); + /// raft_server creates unique_ptr from it nuraft::context * ctx = new nuraft::context( casted_state_manager, casted_state_machine, diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 4ed88ceb855..6aa5e8f3c77 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -48,6 +48,9 @@ private: void shutdownRaftServer(); + void loadLatestConfig(); + bool recover = false; + public: KeeperServer( const KeeperConfigurationAndSettingsPtr & settings_, diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 315882ee988..08f137cd1a8 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -175,11 +175,12 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) } -void KeeperStateMachine::commit_config(const uint64_t /*log_idx*/, nuraft::ptr & new_conf) +void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr & new_conf) { std::lock_guard lock(cluster_config_lock); auto tmp = new_conf->serialize(); cluster_config = ClusterConfig::deserialize(*tmp); + last_committed_idx = log_idx; } nuraft::ptr KeeperStateMachine::last_snapshot() diff --git a/src/Coordination/KeeperStateManager.cpp b/src/Coordination/KeeperStateManager.cpp index f9bfea5e69a..909ad15a02c 100644 --- a/src/Coordination/KeeperStateManager.cpp +++ b/src/Coordination/KeeperStateManager.cpp @@ -248,6 +248,7 @@ void KeeperStateManager::flushLogStore() void KeeperStateManager::save_config(const nuraft::cluster_config & config) { + LOG_INFO(&Poco::Logger::get("State Manager"), "Save config called"); std::lock_guard lock(configuration_wrapper_mutex); nuraft::ptr buf = config.serialize(); configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf); diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index 66c6cc03b87..2beb9670355 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -6,6 +6,7 @@ #include #include #include +#include "Coordination/KeeperStateMachine.h" #include namespace DB @@ -103,9 +104,17 @@ public: /// Read all log entries in log store from the begging and return latest config (with largest log_index) ClusterConfigPtr getLatestConfigFromLogStore() const; + ClusterConfigPtr getLocalConfig() const + { + std::lock_guard lock{configuration_wrapper_mutex}; + return configuration_wrapper.cluster_config; + } + /// Get configuration diff between proposed XML and current state in RAFT ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const; + void applyConfigsToLogEntry(); + private: /// Wrapper struct for Keeper cluster config. We parse this /// info from XML files.