Add checksum to the state file

This commit is contained in:
Antonio Andelic 2022-07-10 15:01:38 +00:00
parent 2b26865f90
commit 183f5cc940
3 changed files with 66 additions and 21 deletions

View File

@ -202,7 +202,7 @@ KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfigur
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path, const std::string & state_file_path)
: my_server_id(server_id_), secure(false), log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false)),
server_state_path(state_file_path)
server_state_path(state_file_path), logger(&Poco::Logger::get("KeeperStateManager"))
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
@ -228,6 +228,7 @@ KeeperStateManager::KeeperStateManager(
coordination_settings->force_sync,
coordination_settings->compress_logs))
, server_state_path(state_file_path)
, logger(&Poco::Logger::get("KeeperStateManager"))
{
}
@ -284,6 +285,12 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state)
WriteBufferFromFile server_state_file(server_state_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
auto buf = state.serialize();
// calculate checksum
SipHash hash;
hash.update(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
writeIntBinary(hash.get64(), server_state_file);
server_state_file.write(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
server_state_file.sync();
server_state_file.close();
@ -295,7 +302,7 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
{
const auto & old_path = getOldServerStatePath();
const auto try_read_file = [](const auto & path) -> nuraft::ptr<nuraft::srv_state>
const auto try_read_file = [this](const auto & path) -> nuraft::ptr<nuraft::srv_state>
{
ReadBufferFromFile read_buf(path);
auto content_size = read_buf.getFileSize();
@ -303,19 +310,36 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
if (content_size == 0)
return nullptr;
auto state_buf = nuraft::buffer::alloc(content_size);
read_buf.read(reinterpret_cast<char *>(state_buf->data_begin()), content_size);
assertEOF(read_buf);
try
{
uint64_t read_checksum;
readIntBinary(read_checksum, read_buf);
auto buffer_size = content_size - sizeof read_checksum;
auto state_buf = nuraft::buffer::alloc(buffer_size);
read_buf.read(reinterpret_cast<char *>(state_buf->data_begin()), buffer_size);
SipHash hash;
hash.update(reinterpret_cast<const char *>(state_buf->data_begin()), state_buf->size());
if (read_checksum != hash.get64())
{
LOG_WARNING(
logger,
"Invalid checksum while reading state from {}. Got {}, expected {}",
path.generic_string(),
hash.get64(),
read_checksum);
return nullptr;
}
auto state = nuraft::srv_state::deserialize(*state_buf);
LOG_INFO(&Poco::Logger::get("KeeperStateManager"), "Read state from {}", path.generic_string());
LOG_INFO(logger, "Read state from {}", path.generic_string());
return state;
}
catch (const std::overflow_error &)
catch (...)
{
LOG_WARNING(&Poco::Logger::get("KeeperStateManager"), "Failed to deserialize state from {}", path.generic_string());
LOG_WARNING(logger, "Failed to deserialize state from {}", path.generic_string());
return nullptr;
}
};
@ -338,7 +362,7 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
std::filesystem::rename(old_path, server_state_path);
}
LOG_INFO(&Poco::Logger::get("KeeperStateManager"), "No state was read");
LOG_INFO(logger, "No state was read");
return nullptr;
}

View File

@ -138,6 +138,8 @@ private:
const std::filesystem::path server_state_path;
Poco::Logger * logger;
public:
/// Parse configuration from xml config.
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const;

View File

@ -4,6 +4,7 @@
#include "Coordination/KeeperStorage.h"
#include "Core/Defines.h"
#include "IO/WriteHelpers.h"
#include "config_core.h"
#if USE_NURAFT
@ -2082,19 +2083,37 @@ TEST_P(CoordinationTest, TestDurableState)
reload_state_manager();
assert_read_state();
state_manager.reset();
{
SCOPED_TRACE("Read from corrupted file");
state_manager.reset();
DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY);
write_buf.seek(20, SEEK_SET);
DB::writeIntBinary(31, write_buf);
write_buf.sync();
write_buf.close();
reload_state_manager();
ASSERT_EQ(state_manager->read_state(), nullptr);
}
DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_WRONLY);
write_buf.write(20);
write_buf.sync();
write_buf.close();
reload_state_manager();
ASSERT_EQ(state_manager->read_state(), nullptr);
{
SCOPED_TRACE("Read from file with invalid size");
state_manager.reset();
state_manager.reset();
std::filesystem::remove("./state");
reload_state_manager();
ASSERT_EQ(state_manager->read_state(), nullptr);
DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_WRONLY);
DB::writeIntBinary(20, write_buf);
write_buf.sync();
write_buf.close();
reload_state_manager();
ASSERT_EQ(state_manager->read_state(), nullptr);
}
{
SCOPED_TRACE("State file is missing");
state_manager.reset();
std::filesystem::remove("./state");
reload_state_manager();
ASSERT_EQ(state_manager->read_state(), nullptr);
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,