diff --git a/src/Coordination/KeeperStateManager.cpp b/src/Coordination/KeeperStateManager.cpp index 986698af96d..0f6dac661b5 100644 --- a/src/Coordination/KeeperStateManager.cpp +++ b/src/Coordination/KeeperStateManager.cpp @@ -202,7 +202,7 @@ KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfigur KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path, const std::string & state_file_path) : my_server_id(server_id_), secure(false), log_store(nuraft::cs_new(logs_path, 5000, false, false)), - server_state_path(state_file_path) + server_state_path(state_file_path), logger(&Poco::Logger::get("KeeperStateManager")) { auto peer_config = nuraft::cs_new(my_server_id, host + ":" + std::to_string(port)); configuration_wrapper.cluster_config = nuraft::cs_new(); @@ -228,6 +228,7 @@ KeeperStateManager::KeeperStateManager( coordination_settings->force_sync, coordination_settings->compress_logs)) , server_state_path(state_file_path) + , logger(&Poco::Logger::get("KeeperStateManager")) { } @@ -284,6 +285,12 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state) WriteBufferFromFile server_state_file(server_state_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); auto buf = state.serialize(); + + // calculate checksum + SipHash hash; + hash.update(reinterpret_cast(buf->data_begin()), buf->size()); + writeIntBinary(hash.get64(), server_state_file); + server_state_file.write(reinterpret_cast(buf->data_begin()), buf->size()); server_state_file.sync(); server_state_file.close(); @@ -295,7 +302,7 @@ nuraft::ptr KeeperStateManager::read_state() { const auto & old_path = getOldServerStatePath(); - const auto try_read_file = [](const auto & path) -> nuraft::ptr + const auto try_read_file = [this](const auto & path) -> nuraft::ptr { ReadBufferFromFile read_buf(path); auto content_size = read_buf.getFileSize(); @@ -303,19 +310,36 @@ nuraft::ptr KeeperStateManager::read_state() if (content_size == 0) return nullptr; - auto state_buf = nuraft::buffer::alloc(content_size); - read_buf.read(reinterpret_cast(state_buf->data_begin()), content_size); - assertEOF(read_buf); - try { + uint64_t read_checksum; + readIntBinary(read_checksum, read_buf); + + auto buffer_size = content_size - sizeof read_checksum; + auto state_buf = nuraft::buffer::alloc(buffer_size); + read_buf.read(reinterpret_cast(state_buf->data_begin()), buffer_size); + + SipHash hash; + hash.update(reinterpret_cast(state_buf->data_begin()), state_buf->size()); + + if (read_checksum != hash.get64()) + { + LOG_WARNING( + logger, + "Invalid checksum while reading state from {}. Got {}, expected {}", + path.generic_string(), + hash.get64(), + read_checksum); + return nullptr; + } + auto state = nuraft::srv_state::deserialize(*state_buf); - LOG_INFO(&Poco::Logger::get("KeeperStateManager"), "Read state from {}", path.generic_string()); + LOG_INFO(logger, "Read state from {}", path.generic_string()); return state; } - catch (const std::overflow_error &) + catch (...) { - LOG_WARNING(&Poco::Logger::get("KeeperStateManager"), "Failed to deserialize state from {}", path.generic_string()); + LOG_WARNING(logger, "Failed to deserialize state from {}", path.generic_string()); return nullptr; } }; @@ -338,7 +362,7 @@ nuraft::ptr KeeperStateManager::read_state() std::filesystem::rename(old_path, server_state_path); } - LOG_INFO(&Poco::Logger::get("KeeperStateManager"), "No state was read"); + LOG_INFO(logger, "No state was read"); return nullptr; } diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index 43c33b40014..5d210f8c0ea 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -138,6 +138,8 @@ private: const std::filesystem::path server_state_path; + Poco::Logger * logger; + public: /// Parse configuration from xml config. KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 0711cb42b4e..1f0d2bb762d 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -4,6 +4,7 @@ #include "Coordination/KeeperStorage.h" #include "Core/Defines.h" +#include "IO/WriteHelpers.h" #include "config_core.h" #if USE_NURAFT @@ -2082,19 +2083,37 @@ TEST_P(CoordinationTest, TestDurableState) reload_state_manager(); assert_read_state(); - state_manager.reset(); + { + SCOPED_TRACE("Read from corrupted file"); + state_manager.reset(); + DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY); + write_buf.seek(20, SEEK_SET); + DB::writeIntBinary(31, write_buf); + write_buf.sync(); + write_buf.close(); + reload_state_manager(); + ASSERT_EQ(state_manager->read_state(), nullptr); + } - DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_WRONLY); - write_buf.write(20); - write_buf.sync(); - write_buf.close(); - reload_state_manager(); - ASSERT_EQ(state_manager->read_state(), nullptr); + { + SCOPED_TRACE("Read from file with invalid size"); + state_manager.reset(); - state_manager.reset(); - std::filesystem::remove("./state"); - reload_state_manager(); - ASSERT_EQ(state_manager->read_state(), nullptr); + DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_WRONLY); + DB::writeIntBinary(20, write_buf); + write_buf.sync(); + write_buf.close(); + reload_state_manager(); + ASSERT_EQ(state_manager->read_state(), nullptr); + } + + { + SCOPED_TRACE("State file is missing"); + state_manager.reset(); + std::filesystem::remove("./state"); + reload_state_manager(); + ASSERT_EQ(state_manager->read_state(), nullptr); + } } INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,