diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 34a97f82399..b4c8b1b9d2d 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -30,6 +30,7 @@ struct Settings; M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \ M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \ + M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \ M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/NuKeeperStateMachine.cpp b/src/Coordination/NuKeeperStateMachine.cpp index c31ac9f486d..b972838c6a2 100644 --- a/src/Coordination/NuKeeperStateMachine.cpp +++ b/src/Coordination/NuKeeperStateMachine.cpp @@ -32,7 +32,7 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_) : coordination_settings(coordination_settings_) , storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds()) - , snapshot_manager(snapshots_path_) + , snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep) , responses_queue(responses_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("NuKeeperStateMachine")) diff --git a/src/Coordination/NuKeeperStorageSerializer.cpp b/src/Coordination/NuKeeperStorageSerializer.cpp index 6e5499ea180..1f23a42be69 100644 --- a/src/Coordination/NuKeeperStorageSerializer.cpp +++ b/src/Coordination/NuKeeperStorageSerializer.cpp @@ -156,8 +156,9 @@ NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot() storage->disableSnapshotMode(); } -NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_) +NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_) : snapshots_path(snapshots_path_) + , snapshots_to_keep(snapshots_to_keep_) { namespace fs = std::filesystem; @@ -169,6 +170,8 @@ NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_p size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path()); existing_snapshots[snapshot_up_to] = p.path(); } + + removeOutdatedSnapshotsIfNeeded(); } @@ -182,6 +185,8 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe copyData(reader, plain_buf); plain_buf.sync(); existing_snapshots.emplace(up_to_log_idx, new_snapshot_path); + removeOutdatedSnapshotsIfNeeded(); + return new_snapshot_path; } @@ -223,6 +228,11 @@ size_t NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * stor return log_id; } +void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded() +{ + while (existing_snapshots.size() > snapshots_to_keep) + existing_snapshots.erase(existing_snapshots.begin()); +} } diff --git a/src/Coordination/NuKeeperStorageSerializer.h b/src/Coordination/NuKeeperStorageSerializer.h index 67682f9cb71..5e6407835ec 100644 --- a/src/Coordination/NuKeeperStorageSerializer.h +++ b/src/Coordination/NuKeeperStorageSerializer.h @@ -37,7 +37,7 @@ public: class NuKeeperSnapshotManager { public: - explicit NuKeeperSnapshotManager(const std::string & snapshots_path_); + NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_); size_t restoreFromLatestSnapshot(NuKeeperStorage * storage) const; @@ -48,7 +48,9 @@ public: nuraft::ptr deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const; private: + void removeOutdatedSnapshotsIfNeeded(); const std::string snapshots_path; + const size_t snapshots_to_keep; std::map existing_snapshots; }; diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index ba26794816e..0609e915ca8 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -895,16 +895,20 @@ TEST(CoordinationTest, SnapshotableHashMapTrySnapshot) map_snp.disableSnapshotMode(); } -TEST(CoordinationTest, TestStorageSnapshotSimple) +void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0) { using Node = DB::NuKeeperStorage::Node; + storage.container.insert(path, Node{.data=data, .ephemeral_owner = ephemeral_owner}); +} +TEST(CoordinationTest, TestStorageSnapshotSimple) +{ ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots"); + DB::NuKeeperSnapshotManager manager("./snapshots", 3); DB::NuKeeperStorage storage(500); - storage.container.insert("/hello", Node{.data="world", .ephemeral_owner = 1}); - storage.container.insert("/hello/somepath", Node{.data="somedata", .ephemeral_owner = 3}); + addNode(storage, "/hello", "world", 1); + addNode(storage, "/hello/somepath", "somedata", 3); storage.session_id_counter = 5; storage.zxid = 2; storage.ephemerals[3] = {"/hello"}; @@ -945,7 +949,6 @@ TEST(CoordinationTest, TestStorageSnapshotSimple) EXPECT_EQ(restored_storage.session_and_timeout.size(), 2); } - int main(int argc, char ** argv) { Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr));