Make state durable

This commit is contained in:
Antonio Andelic 2022-07-10 09:33:49 +00:00
parent 1bd97fa6fa
commit f60074e9ec
6 changed files with 110 additions and 12 deletions

View File

@ -186,6 +186,8 @@ KeeperConfigurationAndSettings::loadFromConfig(const Poco::Util::AbstractConfigu
ret->log_storage_path = getLogsPathFromConfig(config, standalone_keeper_); ret->log_storage_path = getLogsPathFromConfig(config, standalone_keeper_);
ret->snapshot_storage_path = getSnapshotsPathFromConfig(config, standalone_keeper_); ret->snapshot_storage_path = getSnapshotsPathFromConfig(config, standalone_keeper_);
ret->state_file_path = getStateFilePathFromConfig(config, standalone_keeper_);
ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config); ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
return ret; return ret;
@ -221,4 +223,12 @@ String KeeperConfigurationAndSettings::getSnapshotsPathFromConfig(const Poco::Ut
return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/snapshots"; return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/snapshots";
} }
String KeeperConfigurationAndSettings::getStateFilePathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_)
{
if (standalone_keeper_)
return std::filesystem::path{config.getString("path", KEEPER_DEFAULT_PATH)} / "state";
else
return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/state";
}
} }

View File

@ -77,6 +77,7 @@ struct KeeperConfigurationAndSettings
String log_storage_path; String log_storage_path;
String snapshot_storage_path; String snapshot_storage_path;
String state_file_path;
void dump(WriteBufferFromOwnString & buf) const; void dump(WriteBufferFromOwnString & buf) const;
static std::shared_ptr<KeeperConfigurationAndSettings> loadFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_); static std::shared_ptr<KeeperConfigurationAndSettings> loadFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_);
@ -84,6 +85,7 @@ struct KeeperConfigurationAndSettings
private: private:
static String getLogsPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_); static String getLogsPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_);
static String getSnapshotsPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_); static String getSnapshotsPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_);
static String getStateFilePathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper_);
}; };
using KeeperConfigurationAndSettingsPtr = std::shared_ptr<KeeperConfigurationAndSettings>; using KeeperConfigurationAndSettingsPtr = std::shared_ptr<KeeperConfigurationAndSettings>;

View File

@ -113,7 +113,7 @@ KeeperServer::KeeperServer(
checkAndGetSuperdigest(configuration_and_settings_->super_digest), checkAndGetSuperdigest(configuration_and_settings_->super_digest),
config.getBool("keeper_server.digest_enabled", true))) config.getBool("keeper_server.digest_enabled", true)))
, state_manager(nuraft::cs_new<KeeperStateManager>( , state_manager(nuraft::cs_new<KeeperStateManager>(
server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings)) server_id, "keeper_server", configuration_and_settings_->log_storage_path, configuration_and_settings_->state_file_path, config, coordination_settings))
, log(&Poco::Logger::get("KeeperServer")) , log(&Poco::Logger::get("KeeperServer"))
, is_recovering(config.has("keeper_server.force_recovery") && config.getBool("keeper_server.force_recovery")) , is_recovering(config.has("keeper_server.force_recovery") && config.getBool("keeper_server.force_recovery"))
{ {

View File

@ -5,6 +5,7 @@
#include <Common/DNSResolver.h> #include <Common/DNSResolver.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/isLocalAddress.h> #include <Common/isLocalAddress.h>
#include "IO/ReadHelpers.h"
namespace DB namespace DB
{ {
@ -199,8 +200,9 @@ KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfigur
return result; return result;
} }
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path) KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path, const std::string & state_file_path)
: my_server_id(server_id_), secure(false), log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false)) : my_server_id(server_id_), secure(false), log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false)),
server_state_path(state_file_path)
{ {
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port)); auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>(); configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
@ -213,6 +215,7 @@ KeeperStateManager::KeeperStateManager(
int my_server_id_, int my_server_id_,
const std::string & config_prefix_, const std::string & config_prefix_,
const std::string & log_storage_path, const std::string & log_storage_path,
const std::string & state_file_path,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings) const CoordinationSettingsPtr & coordination_settings)
: my_server_id(my_server_id_) : my_server_id(my_server_id_)
@ -224,6 +227,7 @@ KeeperStateManager::KeeperStateManager(
coordination_settings->rotate_log_storage_interval, coordination_settings->rotate_log_storage_interval,
coordination_settings->force_sync, coordination_settings->force_sync,
coordination_settings->compress_logs)) coordination_settings->compress_logs))
, server_state_path(state_file_path)
{ {
} }
@ -261,10 +265,79 @@ void KeeperStateManager::save_config(const nuraft::cluster_config & config)
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf); configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);
} }
const std::filesystem::path & KeeperStateManager::getOldServerStatePath()
{
static auto old_path = [this]{
return server_state_path.parent_path() / (server_state_path.filename().generic_string() + "-OLD");
}();
return old_path;
}
void KeeperStateManager::save_state(const nuraft::srv_state & state) void KeeperStateManager::save_state(const nuraft::srv_state & state)
{ {
nuraft::ptr<nuraft::buffer> buf = state.serialize(); const auto & old_path = getOldServerStatePath();
server_state = nuraft::srv_state::deserialize(*buf);
if (std::filesystem::exists(server_state_path))
std::filesystem::rename(server_state_path, old_path);
WriteBufferFromFile server_state_file(server_state_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
auto buf = state.serialize();
server_state_file.write(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
server_state_file.sync();
server_state_file.close();
std::filesystem::remove(old_path);
}
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
{
const auto & old_path = getOldServerStatePath();
const auto try_read_file = [](const auto & path) -> nuraft::ptr<nuraft::srv_state>
{
ReadBufferFromFile read_buf(path);
auto content_size = read_buf.getFileSize();
if (content_size == 0)
return nullptr;
auto state_buf = nuraft::buffer::alloc(content_size);
read_buf.read(reinterpret_cast<char *>(state_buf->data_begin()), content_size);
assertEOF(read_buf);
try
{
auto state = nuraft::srv_state::deserialize(*state_buf);
LOG_INFO(&Poco::Logger::get("LOGGER"), "Read state from {}", path.generic_string());
return state;
}
catch (const std::overflow_error &)
{
LOG_WARNING(&Poco::Logger::get("KeeperStateManager"), "Failed to deserialize state from {}", path.generic_string());
return nullptr;
}
};
if (std::filesystem::exists(server_state_path))
{
auto state = try_read_file(server_state_path);
if (state)
return state;
}
if (std::filesystem::exists(old_path))
{
auto state = try_read_file(server_state_path);
if (state)
return state;
std::filesystem::rename(old_path, server_state_path);
}
return nullptr;
} }
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const

View File

@ -40,6 +40,7 @@ public:
int server_id_, int server_id_,
const std::string & config_prefix_, const std::string & config_prefix_,
const std::string & log_storage_path, const std::string & log_storage_path,
const std::string & state_file_path,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings); const CoordinationSettingsPtr & coordination_settings);
@ -48,7 +49,8 @@ public:
int server_id_, int server_id_,
const std::string & host, const std::string & host,
int port, int port,
const std::string & logs_path); const std::string & logs_path,
const std::string & state_file_path);
void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep); void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep);
@ -67,7 +69,7 @@ public:
void save_state(const nuraft::srv_state & state) override; void save_state(const nuraft::srv_state & state) override;
nuraft::ptr<nuraft::srv_state> read_state() override { return server_state; } nuraft::ptr<nuraft::srv_state> read_state() override;
nuraft::ptr<nuraft::log_store> load_log_store() override { return log_store; } nuraft::ptr<nuraft::log_store> load_log_store() override { return log_store; }
@ -109,6 +111,8 @@ public:
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const; ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
private: private:
const std::filesystem::path & getOldServerStatePath();
/// Wrapper struct for Keeper cluster config. We parse this /// Wrapper struct for Keeper cluster config. We parse this
/// info from XML files. /// info from XML files.
struct KeeperConfigurationWrapper struct KeeperConfigurationWrapper
@ -131,7 +135,8 @@ private:
KeeperConfigurationWrapper configuration_wrapper; KeeperConfigurationWrapper configuration_wrapper;
nuraft::ptr<KeeperLogStore> log_store; nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_state> server_state;
const std::filesystem::path server_state_path;
public: public:
/// Parse configuration from xml config. /// Parse configuration from xml config.

View File

@ -106,13 +106,13 @@ TEST_P(CoordinationTest, BufferSerde)
template <typename StateMachine> template <typename StateMachine>
struct SimpliestRaftServer struct SimpliestRaftServer
{ {
SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_, const std::string & logs_path) SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_, const std::string & logs_path, const std::string & state_path)
: server_id(server_id_) : server_id(server_id_)
, hostname(hostname_) , hostname(hostname_)
, port(port_) , port(port_)
, endpoint(hostname + ":" + std::to_string(port)) , endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<StateMachine>()) , state_machine(nuraft::cs_new<StateMachine>())
, state_manager(nuraft::cs_new<DB::KeeperStateManager>(server_id, hostname, port, logs_path)) , state_manager(nuraft::cs_new<DB::KeeperStateManager>(server_id, hostname, port, logs_path, state_path))
{ {
state_manager->loadLogStore(1, 0); state_manager->loadLogStore(1, 0);
nuraft::raft_params params; nuraft::raft_params params;
@ -185,7 +185,7 @@ nuraft::ptr<nuraft::buffer> getBuffer(int64_t number)
TEST_P(CoordinationTest, TestSummingRaft1) TEST_P(CoordinationTest, TestSummingRaft1)
{ {
ChangelogDirTest test("./logs"); ChangelogDirTest test("./logs");
SummingRaftServer s1(1, "localhost", 44444, "./logs"); SummingRaftServer s1(1, "localhost", 44444, "./logs", "./state");
/// Single node is leader /// Single node is leader
EXPECT_EQ(s1.raft_instance->get_leader(), 1); EXPECT_EQ(s1.raft_instance->get_leader(), 1);
@ -1064,6 +1064,13 @@ void addNode(DB::KeeperStorage & storage, const std::string & path, const std::s
node.setData(data); node.setData(data);
node.stat.ephemeralOwner = ephemeral_owner; node.stat.ephemeralOwner = ephemeral_owner;
storage.container.insertOrReplace(path, node); storage.container.insertOrReplace(path, node);
auto child_it = storage.container.find(path);
auto child_path = DB::getBaseName(child_it->key);
storage.container.updateValue(DB::parentPath(StringRef{path}), [&](auto & parent)
{
parent.addChild(child_path);
parent.stat.numChildren++;
});
} }
TEST_P(CoordinationTest, TestStorageSnapshotSimple) TEST_P(CoordinationTest, TestStorageSnapshotSimple)
@ -1221,7 +1228,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
storage.container.erase("/hello_" + std::to_string(i)); storage.container.erase("/hello_" + std::to_string(i));
} }
EXPECT_EQ(storage.container.size(), 26); EXPECT_EQ(storage.container.size(), 26);
EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 101); EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 102);
EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1); EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1);
auto buf = manager.serializeSnapshotToBuffer(snapshot); auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, 50); manager.serializeSnapshotBufferToDisk(*buf, 50);
@ -1776,6 +1783,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotEqual)
DB::KeeperSnapshotManager manager("./snapshots", 3, params.enable_compression); DB::KeeperSnapshotManager manager("./snapshots", 3, params.enable_compression);
DB::KeeperStorage storage(500, "", true); DB::KeeperStorage storage(500, "", true);
addNode(storage, "/hello", "");
for (size_t j = 0; j < 5000; ++j) for (size_t j = 0; j < 5000; ++j)
{ {
addNode(storage, "/hello_" + std::to_string(j), "world", 1); addNode(storage, "/hello_" + std::to_string(j), "world", 1);