From aa41ad12f66a56cb61b9a71bc8add9da20179808 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Mon, 6 May 2024 15:38:29 +0200 Subject: [PATCH] address comments --- src/Coordination/KeeperContext.h | 2 +- src/Coordination/KeeperStorage.cpp | 84 +- src/Coordination/KeeperStorage.h | 7 + src/Coordination/RocksDBContainer.h | 21 + src/Coordination/tests/gtest_coordination.cpp | 1012 ++++++++------- src/Coordination/tests/gtest_rocks_keeper.cpp | 1150 ----------------- 6 files changed, 627 insertions(+), 1649 deletions(-) delete mode 100644 src/Coordination/tests/gtest_rocks_keeper.cpp diff --git a/src/Coordination/KeeperContext.h b/src/Coordination/KeeperContext.h index c9abd13b6e9..38013725f56 100644 --- a/src/Coordination/KeeperContext.h +++ b/src/Coordination/KeeperContext.h @@ -70,7 +70,7 @@ public: void setRocksDBDisk(DiskPtr disk); DiskPtr getTemporaryRocksDBDisk() const; - void setRocksDBOptions(std::shared_ptr rocksdb_options_); + void setRocksDBOptions(std::shared_ptr rocksdb_options_ = nullptr); std::shared_ptr getRocksDBOptions() const { return rocksdb_options; } UInt64 getKeeperMemorySoftLimit() const { return memory_soft_limit; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 905fd06359c..73d735c0af8 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1745,63 +1745,53 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc if (path_prefix.empty()) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Path cannot be empty"); - if constexpr (Storage::use_rocksdb) + const auto & get_children = [&]() { - const auto & children = container.getChildren(request.path); - response.names.reserve(children.size()); - const auto add_child = [&](const auto & child) + if constexpr (Storage::use_rocksdb) + return container.getChildren(request.path); + else + return node_it->value.getChildren(); + }; + const auto & children = get_children(); + response.names.reserve(children.size()); + + const auto add_child = [&](const auto & child) + { + using enum Coordination::ListRequestType; + + auto list_request_type = ALL; + if (auto * filtered_list = dynamic_cast(&request)) { - using enum Coordination::ListRequestType; - - auto list_request_type = ALL; - if (auto * filtered_list = dynamic_cast(&request)) - { - list_request_type = filtered_list->list_request_type; - } - - if (list_request_type == ALL) - return true; - - const auto is_ephemeral = child.second.isEphemeral(); - return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); - }; - - for (const auto & child : children) - { - if (add_child(child)) - response.names.push_back(child.first); + list_request_type = filtered_list->list_request_type; } - } - else - { - const auto & children = node_it->value.getChildren(); - response.names.reserve(children.size()); - const auto add_child = [&](const auto child) + if (list_request_type == ALL) + return true; + + bool is_ephemeral; + if constexpr (!Storage::use_rocksdb) { - using enum Coordination::ListRequestType; - - auto list_request_type = ALL; - if (auto * filtered_list = dynamic_cast(&request)) - { - list_request_type = filtered_list->list_request_type; - } - - if (list_request_type == ALL) - return true; - auto child_path = (std::filesystem::path(request.path) / child.toView()).generic_string(); auto child_it = container.find(child_path); if (child_it == container.end()) onStorageInconsistency(); - - const auto is_ephemeral = child_it->value.isEphemeral(); - return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); - }; - - for (const auto child : children) + is_ephemeral = child_it->value.isEphemeral(); + } + else { - if (add_child(child)) + is_ephemeral = child.second.isEphemeral(); + } + + return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); + }; + + for (const auto & child : children) + { + if (add_child(child)) + { + if constexpr (Storage::use_rocksdb) + response.names.push_back(child.first); + else response.names.push_back(child.toString()); } } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 1d47061a17c..c2fd196b95e 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -93,6 +93,13 @@ struct KeeperRocksNodeInfo ephemeral_or_children_data.children_info.num_children = num_children; } + /// dummy interface for test + void addChild(StringRef) {} + auto getChildren() const + { + return std::vector(numChildren()); + } + void increaseNumChildren() { chassert(!isEphemeral()); diff --git a/src/Coordination/RocksDBContainer.h b/src/Coordination/RocksDBContainer.h index 75cc2a9e555..a4a236f332e 100644 --- a/src/Coordination/RocksDBContainer.h +++ b/src/Coordination/RocksDBContainer.h @@ -31,6 +31,20 @@ struct RocksDBContainer using Node = Node_; private: + /// MockNode is only use in test to mock `getChildren()` and `getData()` + struct MockNode + { + std::vector children; + std::string data; + MockNode(size_t children_num, std::string_view data_) + : children(std::vector(children_num)), + data(data_) + { + } + + std::vector getChildren() { return children; } + std::string getData() { return data; } + }; UInt16 getKeyDepth(const std::string & key) { @@ -262,6 +276,13 @@ public: return const_iterator(kv); } + MockNode getValue(StringRef key) + { + auto it = find(key); + chassert(it != end()); + return MockNode(it->value.numChildren(), it->value.getData()); + } + const_iterator updateValue(StringRef key_, ValueUpdater updater) { /// rocksdb::PinnableSlice slice; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index c303db11474..9f0937572a9 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -60,10 +60,22 @@ struct CompressionParam std::string extension; }; -class CoordinationTest : public ::testing::TestWithParam +template +struct TestParam { -protected: - DB::KeeperContextPtr keeper_context = std::make_shared(true, std::make_shared()); + using Storage = Storage_; + static constexpr bool enable_compression = enable_compression_; +}; + +template +class CoordinationTest : public ::testing::Test +{ +public: + using Storage = typename TestType::Storage; + static constexpr bool enable_compression = TestType::enable_compression; + std::string extension; + + DB::KeeperContextPtr keeper_context; LoggerPtr log{getLogger("CoordinationTest")}; void SetUp() override @@ -72,7 +84,12 @@ protected: Poco::Logger::root().setChannel(channel); Poco::Logger::root().setLevel("trace"); + auto settings = std::make_shared(); + settings->use_rocksdb = true; + keeper_context = std::make_shared(true, settings); keeper_context->setLocalLogsPreprocessed(); + keeper_context->setRocksDBOptions(); + extension = enable_compression ? ".zstd" : ""; } void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared("LogDisk", path)); } @@ -82,13 +99,24 @@ protected: keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", path)); } + void setRocksDBDirectory(const std::string & path) + { + keeper_context->setRocksDBDisk(std::make_shared("RocksDisk", path)); + } + void setStateFileDirectory(const std::string & path) { keeper_context->setStateFileDisk(std::make_shared("StateFile", path)); } }; -TEST_P(CoordinationTest, RaftServerConfigParse) +using Implementation = testing::Types, + TestParam, + TestParam, + TestParam>; +TYPED_TEST_SUITE(CoordinationTest, Implementation); + +TYPED_TEST(CoordinationTest, RaftServerConfigParse) { auto parse = Coordination::RaftServerConfig::parse; using Cfg = std::optional; @@ -113,7 +141,7 @@ TEST_P(CoordinationTest, RaftServerConfigParse) (Cfg{{1, "2001:0db8:85a3:0000:0000:8a2e:0370:7334:80"}})); } -TEST_P(CoordinationTest, RaftServerClusterConfigParse) +TYPED_TEST(CoordinationTest, RaftServerClusterConfigParse) { auto parse = Coordination::parseRaftServers; using Cfg = DB::RaftServerConfig; @@ -129,14 +157,14 @@ TEST_P(CoordinationTest, RaftServerClusterConfigParse) (Servers{Cfg{1, "host:80"}, Cfg{2, "host:81"}})); } -TEST_P(CoordinationTest, BuildTest) +TYPED_TEST(CoordinationTest, BuildTest) { DB::InMemoryLogStore store; DB::SummingStateMachine machine; EXPECT_EQ(1, 1); } -TEST_P(CoordinationTest, BufferSerde) +TYPED_TEST(CoordinationTest, BufferSerde) { Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Get); request->xid = 3; @@ -260,13 +288,13 @@ nuraft::ptr getBuffer(int64_t number) return ret; } -TEST_P(CoordinationTest, TestSummingRaft1) +TYPED_TEST(CoordinationTest, TestSummingRaft1) { ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); - setStateFileDirectory("."); + this->setLogDirectory("./logs"); + this->setStateFileDirectory("."); - SummingRaftServer s1(1, "localhost", 44444, keeper_context); + SummingRaftServer s1(1, "localhost", 44444, this->keeper_context); SCOPE_EXIT(if (std::filesystem::exists("./state")) std::filesystem::remove("./state");); /// Single node is leader @@ -279,7 +307,7 @@ TEST_P(CoordinationTest, TestSummingRaft1) while (s1.state_machine->getValue() != 143) { - LOG_INFO(log, "Waiting s1 to apply entry"); + LOG_INFO(this->log, "Waiting s1 to apply entry"); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -295,16 +323,16 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term) return nuraft::cs_new(term, bufwriter.getBuffer()); } -TEST_P(CoordinationTest, ChangelogTestSimple) +TYPED_TEST(CoordinationTest, ChangelogTestSimple) { - auto params = GetParam(); + /// ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello world", 77); changelog.append(entry); @@ -327,16 +355,15 @@ void waitDurableLogs(nuraft::log_store & log_store) } -TEST_P(CoordinationTest, ChangelogTestFile) +TYPED_TEST(CoordinationTest, ChangelogTestFile) { - auto params = GetParam(); ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello world", 77); changelog.append(entry); @@ -344,9 +371,9 @@ TEST_P(CoordinationTest, ChangelogTestFile) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); for (const auto & p : fs::directory_iterator("./logs")) - EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin" + params.extension); + EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin" + this->extension); changelog.append(entry); changelog.append(entry); @@ -357,20 +384,20 @@ TEST_P(CoordinationTest, ChangelogTestFile) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); } -TEST_P(CoordinationTest, ChangelogReadWrite) +TYPED_TEST(CoordinationTest, ChangelogReadWrite) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 1000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) @@ -385,9 +412,9 @@ TEST_P(CoordinationTest, ChangelogReadWrite) waitDurableLogs(changelog); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 1000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 10); EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term()); @@ -403,16 +430,16 @@ TEST_P(CoordinationTest, ChangelogReadWrite) EXPECT_EQ(10, entries_from_range->size()); } -TEST_P(CoordinationTest, ChangelogWriteAt) +TYPED_TEST(CoordinationTest, ChangelogWriteAt) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 1000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -435,9 +462,9 @@ TEST_P(CoordinationTest, ChangelogWriteAt) EXPECT_EQ(changelog.next_slot(), 8); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1000}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 1000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), changelog.size()); @@ -447,16 +474,16 @@ TEST_P(CoordinationTest, ChangelogWriteAt) } -TEST_P(CoordinationTest, ChangelogTestAppendAfterRead) +TYPED_TEST(CoordinationTest, ChangelogTestAppendAfterRead) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 7; ++i) { @@ -469,13 +496,13 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 7); @@ -488,8 +515,8 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead) EXPECT_EQ(changelog_reader.size(), 10); waitDurableLogs(changelog_reader); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); size_t logs_count = 0; for (const auto & _ [[maybe_unused]] : fs::directory_iterator("./logs")) @@ -504,9 +531,9 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead) waitDurableLogs(changelog_reader); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); logs_count = 0; for (const auto & _ [[maybe_unused]] : fs::directory_iterator("./logs")) @@ -533,16 +560,16 @@ void assertFileDeleted(std::string path) } -TEST_P(CoordinationTest, ChangelogTestCompaction) +TYPED_TEST(CoordinationTest, ChangelogTestCompaction) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 3; ++i) @@ -556,7 +583,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction) EXPECT_EQ(changelog.size(), 3); - keeper_context->setLastCommitIndex(2); + this->keeper_context->setLastCommitIndex(2); changelog.compact(2); EXPECT_EQ(changelog.size(), 1); @@ -564,7 +591,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction) EXPECT_EQ(changelog.next_slot(), 4); EXPECT_EQ(changelog.last_entry()->get_term(), 20); // nothing should be deleted - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); auto e1 = getLogEntry("hello world", 30); changelog.append(e1); @@ -578,15 +605,15 @@ TEST_P(CoordinationTest, ChangelogTestCompaction) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); - keeper_context->setLastCommitIndex(6); + this->keeper_context->setLastCommitIndex(6); changelog.compact(6); std::this_thread::sleep_for(std::chrono::microseconds(1000)); - assertFileDeleted("./logs/changelog_1_5.bin" + params.extension); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + assertFileDeleted("./logs/changelog_1_5.bin" + this->extension); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); EXPECT_EQ(changelog.size(), 1); EXPECT_EQ(changelog.start_index(), 7); @@ -594,9 +621,9 @@ TEST_P(CoordinationTest, ChangelogTestCompaction) EXPECT_EQ(changelog.last_entry()->get_term(), 60); /// And we able to read it DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(7, 0); EXPECT_EQ(changelog_reader.size(), 1); @@ -605,16 +632,16 @@ TEST_P(CoordinationTest, ChangelogTestCompaction) EXPECT_EQ(changelog_reader.last_entry()->get_term(), 60); } -TEST_P(CoordinationTest, ChangelogTestBatchOperations) +TYPED_TEST(CoordinationTest, ChangelogTestBatchOperations) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -630,9 +657,9 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperations) auto entries = changelog.pack(1, 5); DB::KeeperLogStore apply_changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); apply_changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) @@ -660,18 +687,18 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperations) EXPECT_EQ(apply_changelog.entry_at(12)->get_term(), 40); } -TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty) +TYPED_TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); nuraft::ptr entries; { DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -688,11 +715,11 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty) } ChangelogDirTest test1("./logs1"); - setLogDirectory("./logs1"); + this->setLogDirectory("./logs1"); DB::KeeperLogStore changelog_new( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_new.init(1, 0); EXPECT_EQ(changelog_new.size(), 0); @@ -715,23 +742,23 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty) EXPECT_EQ(changelog_new.next_slot(), 11); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(5, 0); } -TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile) +TYPED_TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) @@ -743,13 +770,13 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); EXPECT_EQ(changelog.size(), 33); @@ -763,19 +790,19 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); DB::KeeperLogStore changelog_read( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_read.init(1, 0); EXPECT_EQ(changelog_read.size(), 7); EXPECT_EQ(changelog_read.start_index(), 1); @@ -783,16 +810,16 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile) EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555); } -TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder) +TYPED_TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) @@ -804,13 +831,13 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); EXPECT_EQ(changelog.size(), 33); @@ -824,19 +851,19 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); DB::KeeperLogStore changelog_read( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_read.init(1, 0); EXPECT_EQ(changelog_read.size(), 11); EXPECT_EQ(changelog_read.start_index(), 1); @@ -844,16 +871,16 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder) EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555); } -TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles) +TYPED_TEST(CoordinationTest, ChangelogTestWriteAtAllFiles) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) { @@ -864,13 +891,13 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); EXPECT_EQ(changelog.size(), 33); @@ -884,26 +911,26 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); } -TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead) +TYPED_TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -915,19 +942,19 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead) EXPECT_EQ(changelog.size(), 35); waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin" + this->extension)); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); auto entry = getLogEntry("36_hello_world", 360); @@ -937,14 +964,14 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead) EXPECT_EQ(changelog_reader.size(), 36); waitDurableLogs(changelog_reader); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin" + this->extension)); } namespace @@ -965,18 +992,18 @@ void assertBrokenFileRemoved(const fs::path & directory, const fs::path & filena } -TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) +TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) { static const fs::path log_folder{"./logs"}; - auto params = GetParam(); + ChangelogDirTest test(log_folder); - setLogDirectory(log_folder); + this->setLogDirectory(log_folder); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -988,36 +1015,36 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) EXPECT_EQ(changelog.size(), 35); waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + this->extension)); DB::WriteBufferFromFile plain_buf( - "./logs/changelog_11_15.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + "./logs/changelog_11_15.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(0); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); changelog_reader.end_of_append_batch(0, 0); EXPECT_EQ(changelog_reader.size(), 10); EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); - assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + params.extension); + assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + this->extension); auto entry = getLogEntry("h", 7777); changelog_reader.append(entry); @@ -1027,35 +1054,35 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) waitDurableLogs(changelog_reader); - EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + this->extension)); - assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + params.extension); - assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + params.extension); + assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + this->extension); + assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + this->extension); DB::KeeperLogStore changelog_reader2( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader2.init(1, 0); EXPECT_EQ(changelog_reader2.size(), 11); EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); } /// Truncating all entries -TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) +TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -1066,22 +1093,22 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) changelog.end_of_append_batch(0, 0); waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + this->extension)); DB::WriteBufferFromFile plain_buf( - "./logs/changelog_1_20.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + "./logs/changelog_1_20.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(30); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 0); - EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); - assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + params.extension); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + this->extension)); + assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + this->extension); auto entry = getLogEntry("hello_world", 7777); changelog_reader.append(entry); changelog_reader.end_of_append_batch(0, 0); @@ -1092,9 +1119,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777); DB::KeeperLogStore changelog_reader2( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 1}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 1}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader2.init(1, 0); EXPECT_EQ(changelog_reader2.size(), 1); EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); @@ -1103,15 +1130,15 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) /// Truncating only some entries from the end /// For compressed logs we have no reliable way of knowing how many log entries were lost /// after we truncate some bytes from the end -TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) +TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) { ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -1133,7 +1160,7 @@ TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) DB::KeeperLogStore changelog_reader( DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 19); @@ -1150,10 +1177,10 @@ TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777); } -TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) +TYPED_TEST(CoordinationTest, ChangelogTestMixedLogTypes) { ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); std::vector changelog_files; @@ -1185,7 +1212,7 @@ TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) DB::KeeperLogStore changelog( DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -1206,7 +1233,7 @@ TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) DB::KeeperLogStore changelog_compressed( DB::LogFileSettings{.force_sync = true, .compress_logs = true, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_compressed.init(1, 0); verify_changelog_files(); @@ -1228,7 +1255,7 @@ TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) DB::KeeperLogStore changelog( DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); verify_changelog_files(); @@ -1246,16 +1273,16 @@ TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) } } -TEST_P(CoordinationTest, ChangelogTestLostFiles) +TYPED_TEST(CoordinationTest, ChangelogTestLostFiles) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -1266,30 +1293,30 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles) changelog.end_of_append_batch(0, 0); waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + this->extension)); - fs::remove("./logs/changelog_1_20.bin" + params.extension); + fs::remove("./logs/changelog_1_20.bin" + this->extension); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20}, DB::FlushSettings(), - keeper_context); + this->keeper_context); /// It should print error message, but still able to start changelog_reader.init(5, 0); - assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + params.extension); + assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + this->extension); } -TEST_P(CoordinationTest, ChangelogTestLostFiles2) +TYPED_TEST(CoordinationTest, ChangelogTestLostFiles2) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 10}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -1301,24 +1328,24 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles2) waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_30.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_31_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_21_30.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_31_40.bin" + this->extension)); // we have a gap in our logs, we need to remove all the logs after the gap - fs::remove("./logs/changelog_21_30.bin" + params.extension); + fs::remove("./logs/changelog_21_30.bin" + this->extension); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 10}, DB::FlushSettings(), - keeper_context); + this->keeper_context); /// It should print error message, but still able to start changelog_reader.init(5, 0); - EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + this->extension)); - assertBrokenFileRemoved("./logs", "changelog_31_40.bin" + params.extension); + assertBrokenFileRemoved("./logs", "changelog_31_40.bin" + this->extension); } struct IntNode { @@ -1334,7 +1361,7 @@ struct IntNode bool operator!=(const int & rhs) const { return rhs != this->value; } }; -TEST_P(CoordinationTest, SnapshotableHashMapSimple) +TYPED_TEST(CoordinationTest, SnapshotableHashMapSimple) { DB::SnapshotableHashTable hello; EXPECT_TRUE(hello.insert("hello", 5).second); @@ -1349,7 +1376,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapSimple) EXPECT_EQ(hello.size(), 0); } -TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) +TYPED_TEST(CoordinationTest, SnapshotableHashMapTrySnapshot) { DB::SnapshotableHashTable map_snp; EXPECT_TRUE(map_snp.insert("/hello", 7).second); @@ -1426,7 +1453,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) map_snp.disableSnapshotMode(); } -TEST_P(CoordinationTest, SnapshotableHashMapDataSize) +TYPED_TEST(CoordinationTest, SnapshotableHashMapDataSize) { /// int DB::SnapshotableHashTable hello; @@ -1503,9 +1530,10 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize) EXPECT_EQ(world.getApproximateDataSize(), 0); } -void addNode(DB::KeeperMemoryStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner = 0) +template +void addNode(Storage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner = 0) { - using Node = DB::KeeperMemoryStorage::Node; + using Node = typename Storage::Node; Node node{}; node.setData(data); node.setEphemeralOwner(ephemeral_owner); @@ -1521,15 +1549,20 @@ void addNode(DB::KeeperMemoryStorage & storage, const std::string & path, const }); } -TEST_P(CoordinationTest, TestStorageSnapshotSimple) +TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple) { - auto params = GetParam(); + ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); + using Storage = typename TestFixture::Storage; - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + + Storage storage(500, "", this->keeper_context); addNode(storage, "/hello1", "world", 1); addNode(storage, "/hello2", "somedata", 3); storage.session_id_counter = 5; @@ -1539,7 +1572,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple) storage.getSessionID(130); storage.getSessionID(130); - DB::KeeperStorageSnapshot snapshot(&storage, 2); + DB::KeeperStorageSnapshot snapshot(&storage, 2); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2); EXPECT_EQ(snapshot.session_id, 7); @@ -1548,7 +1581,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple) auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 2); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + this->extension)); auto debuf = manager.deserializeSnapshotBufferFromDisk(2); @@ -1571,15 +1604,20 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple) EXPECT_EQ(restored_storage->session_and_timeout.size(), 2); } -TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites) +TYPED_TEST(CoordinationTest, TestStorageSnapshotMoreWrites) { - auto params = GetParam(); + ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); + using Storage = typename TestFixture::Storage; - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + + Storage storage(500, "", this->keeper_context); storage.getSessionID(130); for (size_t i = 0; i < 50; ++i) @@ -1587,7 +1625,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites) addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } - DB::KeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50); EXPECT_EQ(snapshot.snapshot_container_size, 54); @@ -1600,7 +1638,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites) auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 50); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + this->extension)); auto debuf = manager.deserializeSnapshotBufferFromDisk(50); @@ -1614,15 +1652,20 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites) } -TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots) +TYPED_TEST(CoordinationTest, TestStorageSnapshotManySnapshots) { - auto params = GetParam(); + ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); + using Storage = typename TestFixture::Storage; - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + + Storage storage(500, "", this->keeper_context); storage.getSessionID(130); for (size_t j = 1; j <= 5; ++j) @@ -1632,17 +1675,17 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots) addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } - DB::KeeperStorageSnapshot snapshot(&storage, j * 50); + DB::KeeperStorageSnapshot snapshot(&storage, j * 50); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, j * 50); - EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin" + params.extension)); + EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin" + this->extension)); } - EXPECT_FALSE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); - EXPECT_FALSE(fs::exists("./snapshots/snapshot_100.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_150.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_200.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin" + params.extension)); + EXPECT_FALSE(fs::exists("./snapshots/snapshot_50.bin" + this->extension)); + EXPECT_FALSE(fs::exists("./snapshots/snapshot_100.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_150.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_200.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin" + this->extension)); auto [restored_storage, meta, _] = manager.restoreFromLatestSnapshot(); @@ -1655,21 +1698,26 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots) } } -TEST_P(CoordinationTest, TestStorageSnapshotMode) +TYPED_TEST(CoordinationTest, TestStorageSnapshotMode) { - auto params = GetParam(); - ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest test("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + Storage storage(500, "", this->keeper_context); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } { - DB::KeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i)); @@ -1684,12 +1732,15 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode) storage.container.erase("/hello_" + std::to_string(i)); } EXPECT_EQ(storage.container.size(), 29); - EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 105); + if constexpr (Storage::use_rocksdb) + EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 54); + else + EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 105); EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 50); } - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + this->extension)); EXPECT_EQ(storage.container.size(), 29); storage.clearGarbageAfterSnapshot(); EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 29); @@ -1709,28 +1760,33 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode) } } -TEST_P(CoordinationTest, TestStorageSnapshotBroken) +TYPED_TEST(CoordinationTest, TestStorageSnapshotBroken) { - auto params = GetParam(); - ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest test("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + Storage storage(500, "", this->keeper_context); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } { - DB::KeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 50); } - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + this->extension)); /// Let's corrupt file DB::WriteBufferFromFile plain_buf( - "./snapshots/snapshot_50.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + "./snapshots/snapshot_50.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(34); plain_buf.sync(); @@ -1757,6 +1813,7 @@ getLogEntryFromZKRequest(size_t term, int64_t session_id, int64_t zxid, const Co return nuraft::cs_new(term, buffer); } +template void testLogAndStateMachine( DB::CoordinationSettingsPtr settings, uint64_t total_logs, @@ -1767,12 +1824,15 @@ void testLogAndStateMachine( ChangelogDirTest snapshots("./snapshots"); ChangelogDirTest logs("./logs"); + ChangelogDirTest rocks("./rocksdb"); auto get_keeper_context = [&] { auto local_keeper_context = std::make_shared(true, settings); local_keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", "./snapshots")); local_keeper_context->setLogDisk(std::make_shared("LogDisk", "./logs")); + local_keeper_context->setRocksDBDisk(std::make_shared("RocksDisk", "./rocksdb")); + local_keeper_context->setRocksDBOptions(); return local_keeper_context; }; @@ -1780,7 +1840,7 @@ void testLogAndStateMachine( SnapshotsQueue snapshots_queue{1}; auto keeper_context = get_keeper_context(); - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); + auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); state_machine->init(); DB::KeeperLogStore changelog( @@ -1827,7 +1887,7 @@ void testLogAndStateMachine( SnapshotsQueue snapshots_queue1{1}; keeper_context = get_keeper_context(); - auto restore_machine = std::make_shared>(queue, snapshots_queue1, keeper_context, nullptr); + auto restore_machine = std::make_shared>(queue, snapshots_queue1, keeper_context, nullptr); restore_machine->init(); EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance); @@ -1863,11 +1923,12 @@ void testLogAndStateMachine( } } -TEST_P(CoordinationTest, TestStateMachineAndLogStore) +TYPED_TEST(CoordinationTest, TestStateMachineAndLogStore) { using namespace Coordination; using namespace DB; - auto params = GetParam(); + + using Storage = typename TestFixture::Storage; { CoordinationSettingsPtr settings = std::make_shared(); @@ -1875,78 +1936,83 @@ TEST_P(CoordinationTest, TestStateMachineAndLogStore) settings->reserved_log_items = 10; settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 37, params.enable_compression); + testLogAndStateMachine(settings, 37, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 10; settings->reserved_log_items = 10; settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 11, params.enable_compression); + testLogAndStateMachine(settings, 11, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 10; settings->reserved_log_items = 10; settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 40, params.enable_compression); + testLogAndStateMachine(settings, 40, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 10; settings->reserved_log_items = 20; settings->rotate_log_storage_interval = 30; - testLogAndStateMachine(settings, 40, params.enable_compression); + testLogAndStateMachine(settings, 40, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 10; settings->reserved_log_items = 0; settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 40, params.enable_compression); + testLogAndStateMachine(settings, 40, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 1; settings->reserved_log_items = 1; settings->rotate_log_storage_interval = 32; - testLogAndStateMachine(settings, 32, params.enable_compression); + testLogAndStateMachine(settings, 32, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 10; settings->reserved_log_items = 7; settings->rotate_log_storage_interval = 1; - testLogAndStateMachine(settings, 33, params.enable_compression); + testLogAndStateMachine(settings, 33, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 37; settings->reserved_log_items = 1000; settings->rotate_log_storage_interval = 5000; - testLogAndStateMachine(settings, 33, params.enable_compression); + testLogAndStateMachine(settings, 33, this->enable_compression); } { CoordinationSettingsPtr settings = std::make_shared(); settings->snapshot_distance = 37; settings->reserved_log_items = 1000; settings->rotate_log_storage_interval = 5000; - testLogAndStateMachine(settings, 45, params.enable_compression); + testLogAndStateMachine(settings, 45, this->enable_compression); } } -TEST_P(CoordinationTest, TestEphemeralNodeRemove) +TYPED_TEST(CoordinationTest, TestEphemeralNodeRemove) { using namespace Coordination; using namespace DB; ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); + auto state_machine = std::make_shared>(queue, snapshots_queue, this->keeper_context, nullptr); state_machine->init(); std::shared_ptr request_c = std::make_shared(); @@ -1969,17 +2035,23 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) } -TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitted) +TYPED_TEST(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitted) { using namespace Coordination; using namespace DB; ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); + auto state_machine = std::make_shared>(queue, snapshots_queue, this->keeper_context, nullptr); state_machine->init(); String user_auth_data = "test_user:test_password"; @@ -2019,18 +2091,18 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte EXPECT_EQ(acls[0].permissions, 31); } -TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) +TYPED_TEST(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) { using namespace Coordination; using namespace DB; ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); ResponsesQueue queue(std::numeric_limits::max()); SnapshotsQueue snapshots_queue{1}; - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); + auto state_machine = std::make_shared>(queue, snapshots_queue, this->keeper_context, nullptr); state_machine->init(); String user_auth_data = "test_user:test_password"; @@ -2077,17 +2149,17 @@ TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) } -TEST_P(CoordinationTest, TestRotateIntervalChanges) +TYPED_TEST(CoordinationTest, TestRotateIntervalChanges) { using namespace Coordination; - auto params = GetParam(); + ChangelogDirTest snapshots("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); { DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(0, 3); for (size_t i = 1; i < 55; ++i) @@ -2103,12 +2175,12 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) } - EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + this->extension)); DB::KeeperLogStore changelog_1( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 10}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 10}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_1.init(0, 50); for (size_t i = 0; i < 55; ++i) { @@ -2121,13 +2193,13 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) waitDurableLogs(changelog_1); - EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + this->extension)); DB::KeeperLogStore changelog_2( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 7}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 7}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_2.init(98, 55); for (size_t i = 0; i < 17; ++i) @@ -2141,20 +2213,20 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) waitDurableLogs(changelog_2); - keeper_context->setLastCommitIndex(105); + this->keeper_context->setLastCommitIndex(105); changelog_2.compact(105); std::this_thread::sleep_for(std::chrono::microseconds(1000)); - assertFileDeleted("./logs/changelog_1_100.bin" + params.extension); - EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension)); + assertFileDeleted("./logs/changelog_1_100.bin" + this->extension); + EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + this->extension)); DB::KeeperLogStore changelog_3( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog_3.init(116, 3); for (size_t i = 0; i < 17; ++i) { @@ -2167,20 +2239,20 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) waitDurableLogs(changelog_3); - keeper_context->setLastCommitIndex(125); + this->keeper_context->setLastCommitIndex(125); changelog_3.compact(125); std::this_thread::sleep_for(std::chrono::microseconds(1000)); - assertFileDeleted("./logs/changelog_101_110.bin" + params.extension); - assertFileDeleted("./logs/changelog_111_117.bin" + params.extension); - assertFileDeleted("./logs/changelog_118_124.bin" + params.extension); + assertFileDeleted("./logs/changelog_101_110.bin" + this->extension); + assertFileDeleted("./logs/changelog_111_117.bin" + this->extension); + assertFileDeleted("./logs/changelog_118_124.bin" + this->extension); - EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_137_141.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_137_141.bin" + this->extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin" + this->extension)); } -TEST_P(CoordinationTest, TestSessionExpiryQueue) +TYPED_TEST(CoordinationTest, TestSessionExpiryQueue) { using namespace Coordination; SessionExpiryQueue queue(500); @@ -2198,16 +2270,15 @@ TEST_P(CoordinationTest, TestSessionExpiryQueue) } -TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) +TYPED_TEST(CoordinationTest, TestCompressedLogsMultipleRewrite) { using namespace Coordination; - auto test_params = GetParam(); ChangelogDirTest logs("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(0, 3); for (size_t i = 1; i < 55; ++i) @@ -2222,9 +2293,9 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) waitDurableLogs(changelog); DB::KeeperLogStore changelog1( - DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog1.init(0, 3); for (size_t i = 55; i < 70; ++i) { @@ -2238,9 +2309,9 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) waitDurableLogs(changelog1); DB::KeeperLogStore changelog2( - DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog2.init(0, 3); for (size_t i = 70; i < 80; ++i) { @@ -2252,16 +2323,21 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) } } -TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions) +TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions) { - auto params = GetParam(); + ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); + using Storage = typename TestFixture::Storage; - DB::KeeperMemoryStorage storage(500, "", keeper_context); + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); + + Storage storage(500, "", this->keeper_context); addNode(storage, "/hello1", "world", 1); addNode(storage, "/hello2", "somedata", 3); storage.session_id_counter = 5; @@ -2271,13 +2347,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions) storage.getSessionID(130); storage.getSessionID(130); - DB::KeeperStorageSnapshot snapshot(&storage, 2); + DB::KeeperStorageSnapshot snapshot(&storage, 2); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 2); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + this->extension)); - DB::KeeperSnapshotManager new_manager(3, keeper_context, !params.enable_compression); + DB::KeeperSnapshotManager new_manager(3, this->keeper_context, !this->enable_compression); auto debuf = new_manager.deserializeSnapshotBufferFromDisk(2); @@ -2299,17 +2375,17 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions) EXPECT_EQ(restored_storage->session_and_timeout.size(), 2); } -TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth) +TYPED_TEST(CoordinationTest, ChangelogInsertThreeTimesSmooth) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); { - LOG_INFO(log, "================First time====================="); + LOG_INFO(this->log, "================First time====================="); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog.append(entry); @@ -2319,11 +2395,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth) } { - LOG_INFO(log, "================Second time====================="); + LOG_INFO(this->log, "================Second time====================="); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog.append(entry); @@ -2333,11 +2409,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth) } { - LOG_INFO(log, "================Third time====================="); + LOG_INFO(this->log, "================Third time====================="); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog.append(entry); @@ -2347,11 +2423,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth) } { - LOG_INFO(log, "================Fourth time====================="); + LOG_INFO(this->log, "================Fourth time====================="); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog.append(entry); @@ -2362,18 +2438,18 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth) } -TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth) +TYPED_TEST(CoordinationTest, ChangelogInsertMultipleTimesSmooth) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); for (size_t i = 0; i < 36; ++i) { - LOG_INFO(log, "================First time====================="); + LOG_INFO(this->log, "================First time====================="); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (size_t j = 0; j < 7; ++j) { @@ -2385,24 +2461,24 @@ TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth) } DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); EXPECT_EQ(changelog.next_slot(), 36 * 7 + 1); } -TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard) +TYPED_TEST(CoordinationTest, ChangelogInsertThreeTimesHard) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); { - LOG_INFO(log, "================First time====================="); + LOG_INFO(this->log, "================First time====================="); DB::KeeperLogStore changelog1( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog1.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog1.append(entry); @@ -2412,11 +2488,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard) } { - LOG_INFO(log, "================Second time====================="); + LOG_INFO(this->log, "================Second time====================="); DB::KeeperLogStore changelog2( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog2.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog2.append(entry); @@ -2426,11 +2502,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard) } { - LOG_INFO(log, "================Third time====================="); + LOG_INFO(this->log, "================Third time====================="); DB::KeeperLogStore changelog3( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog3.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog3.append(entry); @@ -2440,11 +2516,11 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard) } { - LOG_INFO(log, "================Fourth time====================="); + LOG_INFO(this->log, "================Fourth time====================="); DB::KeeperLogStore changelog4( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog4.init(1, 0); auto entry = getLogEntry("hello_world", 1000); changelog4.append(entry); @@ -2454,18 +2530,23 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard) } } -TEST_P(CoordinationTest, TestStorageSnapshotEqual) +TYPED_TEST(CoordinationTest, TestStorageSnapshotEqual) { - auto params = GetParam(); + ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); std::optional snapshot_hash; for (size_t i = 0; i < 15; ++i) { - DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); + DB::KeeperSnapshotManager manager(3, this->keeper_context, this->enable_compression); - DB::KeeperMemoryStorage storage(500, "", keeper_context); + Storage storage(500, "", this->keeper_context); addNode(storage, "/hello", ""); for (size_t j = 0; j < 5000; ++j) { @@ -2481,7 +2562,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotEqual) for (size_t j = 0; j < 3333; ++j) storage.getSessionID(130 * j); - DB::KeeperStorageSnapshot snapshot(&storage, storage.zxid); + DB::KeeperStorageSnapshot snapshot(&storage, storage.zxid); auto buf = manager.serializeSnapshotToBuffer(snapshot); @@ -2498,17 +2579,16 @@ TEST_P(CoordinationTest, TestStorageSnapshotEqual) } -TEST_P(CoordinationTest, TestLogGap) +TYPED_TEST(CoordinationTest, TestLogGap) { using namespace Coordination; - auto test_params = GetParam(); ChangelogDirTest logs("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(0, 3); for (size_t i = 1; i < 55; ++i) @@ -2521,13 +2601,13 @@ TEST_P(CoordinationTest, TestLogGap) } DB::KeeperLogStore changelog1( - DB::LogFileSettings{.force_sync = true, .compress_logs = test_params.enable_compression, .rotate_interval = 100}, + DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog1.init(61, 3); /// Logs discarded - EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin" + test_params.extension)); + EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin" + this->extension)); EXPECT_EQ(changelog1.start_index(), 61); EXPECT_EQ(changelog1.next_slot(), 61); } @@ -2539,12 +2619,17 @@ ResponseType getSingleResponse(const auto & responses) return dynamic_cast(*responses[0].response); } -TEST_P(CoordinationTest, TestUncommittedStateBasicCrud) +TYPED_TEST(CoordinationTest, TestUncommittedStateBasicCrud) { using namespace DB; using namespace Coordination; - DB::KeeperMemoryStorage storage{500, "", keeper_context}; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; constexpr std::string_view path = "/test"; @@ -2656,12 +2741,17 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud) ASSERT_FALSE(get_committed_data()); } -TEST_P(CoordinationTest, TestListRequestTypes) +TYPED_TEST(CoordinationTest, TestListRequestTypes) { using namespace DB; using namespace Coordination; - KeeperMemoryStorage storage{500, "", keeper_context}; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; int32_t zxid = 0; @@ -2738,18 +2828,18 @@ TEST_P(CoordinationTest, TestListRequestTypes) } } -TEST_P(CoordinationTest, TestDurableState) +TYPED_TEST(CoordinationTest, TestDurableState) { ChangelogDirTest logs("./logs"); - setLogDirectory("./logs"); - setStateFileDirectory("."); + this->setLogDirectory("./logs"); + this->setStateFileDirectory("."); auto state = nuraft::cs_new(); std::optional state_manager; const auto reload_state_manager = [&] { - state_manager.emplace(1, "localhost", 9181, keeper_context); + state_manager.emplace(1, "localhost", 9181, this->keeper_context); state_manager->loadLogStore(1, 0); }; @@ -2812,10 +2902,15 @@ TEST_P(CoordinationTest, TestDurableState) } } -TEST_P(CoordinationTest, TestFeatureFlags) +TYPED_TEST(CoordinationTest, TestFeatureFlags) { using namespace Coordination; - KeeperMemoryStorage storage{500, "", keeper_context}; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; auto request = std::make_shared(); request->path = DB::keeper_api_feature_flags_path; auto responses = storage.processRequest(request, 0, std::nullopt, true, true); @@ -2827,14 +2922,19 @@ TEST_P(CoordinationTest, TestFeatureFlags) ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS)); } -TEST_P(CoordinationTest, TestSystemNodeModify) +TYPED_TEST(CoordinationTest, TestSystemNodeModify) { using namespace Coordination; int64_t zxid{0}; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + // On INIT we abort when a system path is modified - keeper_context->setServerState(KeeperContext::Phase::RUNNING); - KeeperMemoryStorage storage{500, "", keeper_context}; + this->keeper_context->setServerState(KeeperContext::Phase::RUNNING); + Storage storage{500, "", this->keeper_context}; const auto assert_create = [&](const std::string_view path, const auto expected_code) { auto request = std::make_shared(); @@ -2859,11 +2959,11 @@ TEST_P(CoordinationTest, TestSystemNodeModify) assert_create("/keeper1/test", Error::ZOK); } -TEST_P(CoordinationTest, ChangelogTestMaxLogSize) +TYPED_TEST(CoordinationTest, ChangelogTestMaxLogSize) { - auto params = GetParam(); + ChangelogDirTest test("./logs"); - setLogDirectory("./logs"); + this->setLogDirectory("./logs"); uint64_t last_entry_index{0}; size_t i{0}; @@ -2871,9 +2971,9 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize) SCOPED_TRACE("Small rotation interval, big size limit"); DB::KeeperLogStore changelog( DB::LogFileSettings{ - .force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20, .max_size = 50 * 1024 * 1024}, + .force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20, .max_size = 50 * 1024 * 1024}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); for (; i < 100; ++i) @@ -2891,9 +2991,9 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize) SCOPED_TRACE("Large rotation interval, small size limit"); DB::KeeperLogStore changelog( DB::LogFileSettings{ - .force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100'000, .max_size = 4000}, + .force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100'000, .max_size = 4000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); @@ -2913,20 +3013,25 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize) SCOPED_TRACE("Final verify all logs"); DB::KeeperLogStore changelog( DB::LogFileSettings{ - .force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 100'000, .max_size = 4000}, + .force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 100'000, .max_size = 4000}, DB::FlushSettings(), - keeper_context); + this->keeper_context); changelog.init(1, 0); ASSERT_EQ(changelog.entry_at(last_entry_index)->get_term(), (i - 1 + 44) * 10); } } -TEST_P(CoordinationTest, TestCheckNotExistsRequest) +TYPED_TEST(CoordinationTest, TestCheckNotExistsRequest) { using namespace DB; using namespace Coordination; - KeeperMemoryStorage storage{500, "", keeper_context}; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; int32_t zxid = 0; @@ -2994,18 +3099,23 @@ TEST_P(CoordinationTest, TestCheckNotExistsRequest) } } -TEST_P(CoordinationTest, TestReapplyingDeltas) +TYPED_TEST(CoordinationTest, TestReapplyingDeltas) { using namespace DB; using namespace Coordination; + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + static constexpr int64_t initial_zxid = 100; const auto create_request = std::make_shared(); create_request->path = "/test/data"; create_request->is_sequential = true; - const auto process_create = [](KeeperMemoryStorage & storage, const auto & request, int64_t zxid) + const auto process_create = [](Storage & storage, const auto & request, int64_t zxid) { storage.preprocessRequest(request, 1, 0, zxid); auto responses = storage.processRequest(request, 1, zxid); @@ -3026,19 +3136,19 @@ TEST_P(CoordinationTest, TestReapplyingDeltas) process_create(storage, create_request, zxid); }; - KeeperMemoryStorage storage1{500, "", keeper_context}; + Storage storage1{500, "", this->keeper_context}; commit_initial_data(storage1); for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) storage1.preprocessRequest(create_request, 1, 0, zxid, /*check_acl=*/true, /*digest=*/std::nullopt, /*log_idx=*/zxid); /// create identical new storage - KeeperMemoryStorage storage2{500, "", keeper_context}; + Storage storage2{500, "", this->keeper_context}; commit_initial_data(storage2); storage1.applyUncommittedState(storage2, initial_zxid); - const auto commit_unprocessed = [&](KeeperMemoryStorage & storage) + const auto commit_unprocessed = [&](Storage & storage) { for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) { @@ -3051,7 +3161,7 @@ TEST_P(CoordinationTest, TestReapplyingDeltas) commit_unprocessed(storage1); commit_unprocessed(storage2); - const auto get_children = [&](KeeperMemoryStorage & storage) + const auto get_children = [&](Storage & storage) { const auto list_request = std::make_shared(); list_request->path = "/test"; @@ -3071,8 +3181,8 @@ TEST_P(CoordinationTest, TestReapplyingDeltas) ASSERT_TRUE(children1_set == children2_set); } -INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, - CoordinationTest, - ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); +/// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, +/// CoordinationTest, +/// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); #endif diff --git a/src/Coordination/tests/gtest_rocks_keeper.cpp b/src/Coordination/tests/gtest_rocks_keeper.cpp deleted file mode 100644 index d42eb66d683..00000000000 --- a/src/Coordination/tests/gtest_rocks_keeper.cpp +++ /dev/null @@ -1,1150 +0,0 @@ -#include -#include "config.h" - -#if USE_NURAFT and USE_ROCKSDB - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fs = std::filesystem; -struct ChangelogDirTest -{ - std::string path; - bool drop; - explicit ChangelogDirTest(std::string path_, bool drop_ = true) : path(path_), drop(drop_) - { - EXPECT_FALSE(fs::exists(path)) << "Path " << path << " already exists, remove it to run test"; - fs::create_directory(path); - } - - ~ChangelogDirTest() - { - if (fs::exists(path) && drop) - fs::remove_all(path); - } -}; - -class RocksKeeperTest : public ::testing::Test -{ -protected: - DB::KeeperContextPtr keeper_context; - LoggerPtr log{getLogger("RocksKeeperTest")}; - - void SetUp() override - { - Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr)); - Poco::Logger::root().setChannel(channel); - Poco::Logger::root().setLevel("trace"); - - auto settings = std::make_shared(); - settings->use_rocksdb = true; - keeper_context = std::make_shared(true, settings); - keeper_context->setLocalLogsPreprocessed(); - keeper_context->setRocksDBOptions(nullptr); - } - - void setSnapshotDirectory(const std::string & path) - { - keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", path)); - } - - void setRocksDBDirectory(const std::string & path) - { - keeper_context->setRocksDBDisk(std::make_shared("RocksDisk", path)); - } -}; - -void addNode(DB::KeeperRocksStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner = 0) -{ - using Node = DB::KeeperRocksStorage::Node; - Node node{}; - node.setData(data); - node.setEphemeralOwner(ephemeral_owner); - storage.container.insertOrReplace(path, node); - storage.container.updateValue( - DB::parentNodePath(StringRef{path}), - [&](auto & parent) - { - parent.increaseNumChildren(); - }); -} - -namespace -{ -void waitDurableLogs(nuraft::log_store & log_store) -{ - while (log_store.last_durable_index() != log_store.next_slot() - 1) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); -} -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotSimple) -{ - ChangelogDirTest test("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setSnapshotDirectory("./snapshots"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - - DB::KeeperRocksStorage storage(500, "", keeper_context); - addNode(storage, "/hello1", "world", 1); - addNode(storage, "/hello2", "somedata", 3); - storage.session_id_counter = 5; - storage.zxid = 2; - storage.ephemerals[3] = {"/hello2"}; - storage.ephemerals[1] = {"/hello1"}; - storage.getSessionID(130); - storage.getSessionID(130); - - DB::KeeperStorageSnapshot snapshot(&storage, 2); - - EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2); - EXPECT_EQ(snapshot.session_id, 7); - EXPECT_EQ(snapshot.snapshot_container_size, 6); - EXPECT_EQ(snapshot.session_and_timeout.size(), 2); - - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, 2); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin.zstd")); - - - auto debuf = manager.deserializeSnapshotBufferFromDisk(2); - - auto [restored_storage, snapshot_meta, _] = manager.deserializeSnapshotFromBuffer(debuf); - - EXPECT_EQ(restored_storage->container.size(), 6); - EXPECT_EQ(restored_storage->container.find("/")->value.numChildren(), 3); - EXPECT_EQ(restored_storage->container.find("/hello1")->value.numChildren(), 0); - EXPECT_EQ(restored_storage->container.find("/hello2")->value.numChildren(), 0); - - EXPECT_EQ(restored_storage->container.find("/")->value.getData(), ""); - EXPECT_EQ(restored_storage->container.find("/hello1")->value.getData(), "world"); - EXPECT_EQ(restored_storage->container.find("/hello2")->value.getData(), "somedata"); - EXPECT_EQ(restored_storage->session_id_counter, 7); - EXPECT_EQ(restored_storage->zxid, 2); - EXPECT_EQ(restored_storage->ephemerals.size(), 2); - EXPECT_EQ(restored_storage->ephemerals[3].size(), 1); - EXPECT_EQ(restored_storage->ephemerals[1].size(), 1); - EXPECT_EQ(restored_storage->session_and_timeout.size(), 2); -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotMoreWrites) -{ - ChangelogDirTest test("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setSnapshotDirectory("./snapshots"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - - DB::KeeperRocksStorage storage(500, "", keeper_context); - storage.getSessionID(130); - - for (size_t i = 0; i < 50; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); - } - - DB::KeeperStorageSnapshot snapshot(&storage, 50); - EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50); - EXPECT_EQ(snapshot.snapshot_container_size, 54); - - for (size_t i = 50; i < 100; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); - } - - EXPECT_EQ(storage.container.size(), 104); - - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, 50); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin.zstd")); - - - auto debuf = manager.deserializeSnapshotBufferFromDisk(50); - auto [restored_storage, meta, _] = manager.deserializeSnapshotFromBuffer(debuf); - - EXPECT_EQ(restored_storage->container.size(), 54); - for (size_t i = 0; i < 50; ++i) - { - EXPECT_EQ(restored_storage->container.find("/hello_" + std::to_string(i))->value.getData(), "world_" + std::to_string(i)); - } -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotManySnapshots) -{ - ChangelogDirTest test("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setSnapshotDirectory("./snapshots"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - - DB::KeeperRocksStorage storage(500, "", keeper_context); - storage.getSessionID(130); - - for (size_t j = 1; j <= 5; ++j) - { - for (size_t i = (j - 1) * 50; i < j * 50; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); - } - - DB::KeeperStorageSnapshot snapshot(&storage, j * 50); - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, j * 50); - EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin.zstd")); - } - - EXPECT_FALSE(fs::exists("./snapshots/snapshot_50.bin.zstd")); - EXPECT_FALSE(fs::exists("./snapshots/snapshot_100.bin.zstd")); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_150.bin.zstd")); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_200.bin.zstd")); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin.zstd")); - - - auto [restored_storage, meta, _] = manager.restoreFromLatestSnapshot(); - - EXPECT_EQ(restored_storage->container.size(), 254); - - for (size_t i = 0; i < 250; ++i) - { - EXPECT_EQ(restored_storage->container.find("/hello_" + std::to_string(i))->value.getData(), "world_" + std::to_string(i)); - } -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotMode) -{ - ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - DB::KeeperRocksStorage storage(500, "", keeper_context); - for (size_t i = 0; i < 50; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); - } - - { - DB::KeeperStorageSnapshot snapshot(&storage, 50); - for (size_t i = 0; i < 50; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i)); - } - for (size_t i = 0; i < 50; ++i) - { - EXPECT_EQ(storage.container.find("/hello_" + std::to_string(i))->value.getData(), "wlrd_" + std::to_string(i)); - } - for (size_t i = 0; i < 50; ++i) - { - if (i % 2 == 0) - storage.container.erase("/hello_" + std::to_string(i)); - } - EXPECT_EQ(storage.container.size(), 29); - EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 54); - EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1); - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, 50); - } - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin.zstd")); - EXPECT_EQ(storage.container.size(), 29); - storage.clearGarbageAfterSnapshot(); - for (size_t i = 0; i < 50; ++i) - { - if (i % 2 != 0) - EXPECT_EQ(storage.container.find("/hello_" + std::to_string(i))->value.getData(), "wlrd_" + std::to_string(i)); - else - EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i))); - } - - auto [restored_storage, meta, _] = manager.restoreFromLatestSnapshot(); - - for (size_t i = 0; i < 50; ++i) - { - EXPECT_EQ(restored_storage->container.find("/hello_" + std::to_string(i))->value.getData(), "world_" + std::to_string(i)); - } -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotBroken) -{ - ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - DB::KeeperRocksStorage storage(500, "", keeper_context); - for (size_t i = 0; i < 50; ++i) - { - addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); - } - { - DB::KeeperStorageSnapshot snapshot(&storage, 50); - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, 50); - } - EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin.zstd")); - - /// Let's corrupt file - DB::WriteBufferFromFile plain_buf( - "./snapshots/snapshot_50.bin.zstd", DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); - plain_buf.truncate(34); - plain_buf.sync(); - - EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception); -} - -nuraft::ptr getBufferFromZKRequest(int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request); - -nuraft::ptr -getLogEntryFromZKRequest(size_t term, int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request); - -static void testLogAndStateMachine( - DB::CoordinationSettingsPtr settings, - uint64_t total_logs, - bool enable_compression = true) -{ - using namespace Coordination; - using namespace DB; - - ChangelogDirTest snapshots("./snapshots"); - ChangelogDirTest logs("./logs"); - ChangelogDirTest rocks("./rocksdb"); - - auto get_keeper_context = [&] - { - auto local_keeper_context = std::make_shared(true, settings); - local_keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", "./snapshots")); - local_keeper_context->setLogDisk(std::make_shared("LogDisk", "./logs")); - local_keeper_context->setRocksDBDisk(std::make_shared("RocksDisk", "./rocksdb")); - local_keeper_context->setRocksDBOptions(nullptr); - return local_keeper_context; - }; - - ResponsesQueue queue(std::numeric_limits::max()); - SnapshotsQueue snapshots_queue{1}; - - auto keeper_context = get_keeper_context(); - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); - - state_machine->init(); - DB::KeeperLogStore changelog( - DB::LogFileSettings{ - .force_sync = true, .compress_logs = enable_compression, .rotate_interval = settings->rotate_log_storage_interval}, - DB::FlushSettings(), - keeper_context); - changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items); - - for (size_t i = 1; i < total_logs + 1; ++i) - { - std::shared_ptr request = std::make_shared(); - request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, i, request); - changelog.append(entry); - changelog.end_of_append_batch(0, 0); - - waitDurableLogs(changelog); - - state_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); - state_machine->commit(i, changelog.entry_at(i)->get_buf()); - bool snapshot_created = false; - if (i % settings->snapshot_distance == 0) - { - nuraft::snapshot s(i, 0, std::make_shared()); - nuraft::async_result::handler_type when_done - = [&snapshot_created](bool & ret, nuraft::ptr & /*exception*/) - { - snapshot_created = ret; - }; - - state_machine->create_snapshot(s, when_done); - CreateSnapshotTask snapshot_task; - bool pop_result = snapshots_queue.pop(snapshot_task); - EXPECT_TRUE(pop_result); - - snapshot_task.create_snapshot(std::move(snapshot_task.snapshot), false); - } - - if (snapshot_created && changelog.size() > settings->reserved_log_items) - changelog.compact(i - settings->reserved_log_items); - } - - SnapshotsQueue snapshots_queue1{1}; - keeper_context = get_keeper_context(); - auto restore_machine = std::make_shared>(queue, snapshots_queue1, keeper_context, nullptr); - restore_machine->init(); - EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance); - - DB::KeeperLogStore restore_changelog( - DB::LogFileSettings{ - .force_sync = true, .compress_logs = enable_compression, .rotate_interval = settings->rotate_log_storage_interval}, - DB::FlushSettings(), - keeper_context); - restore_changelog.init(restore_machine->last_commit_index() + 1, settings->reserved_log_items); - - EXPECT_EQ(restore_changelog.size(), std::min(settings->reserved_log_items + total_logs % settings->snapshot_distance, total_logs)); - EXPECT_EQ(restore_changelog.next_slot(), total_logs + 1); - if (total_logs > settings->reserved_log_items + 1) - EXPECT_EQ( - restore_changelog.start_index(), total_logs - total_logs % settings->snapshot_distance - settings->reserved_log_items + 1); - else - EXPECT_EQ(restore_changelog.start_index(), 1); - - for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i) - { - restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); - restore_machine->commit(i, changelog.entry_at(i)->get_buf()); - } - - auto & source_storage = state_machine->getStorageUnsafe(); - auto & restored_storage = restore_machine->getStorageUnsafe(); - - EXPECT_EQ(source_storage.container.size(), restored_storage.container.size()); - for (size_t i = 1; i < total_logs + 1; ++i) - { - auto path = "/hello_" + std::to_string(i); - EXPECT_EQ(source_storage.container.find(path)->value.getData(), restored_storage.container.find(path)->value.getData()); - } -} - -TEST_F(RocksKeeperTest, TestStateMachineAndLogStore) -{ - using namespace Coordination; - using namespace DB; - - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 10; - settings->rotate_log_storage_interval = 10; - - testLogAndStateMachine(settings, 37); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 10; - settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 11); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 10; - settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 40); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 20; - settings->rotate_log_storage_interval = 30; - testLogAndStateMachine(settings, 40); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 0; - settings->rotate_log_storage_interval = 10; - testLogAndStateMachine(settings, 40); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 1; - settings->reserved_log_items = 1; - settings->rotate_log_storage_interval = 32; - testLogAndStateMachine(settings, 32); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 10; - settings->reserved_log_items = 7; - settings->rotate_log_storage_interval = 1; - testLogAndStateMachine(settings, 33); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 37; - settings->reserved_log_items = 1000; - settings->rotate_log_storage_interval = 5000; - testLogAndStateMachine(settings, 33); - } - { - CoordinationSettingsPtr settings = std::make_shared(); - settings->snapshot_distance = 37; - settings->reserved_log_items = 1000; - settings->rotate_log_storage_interval = 5000; - testLogAndStateMachine(settings, 45); - } -} - -TEST_F(RocksKeeperTest, TestEphemeralNodeRemove) -{ - using namespace Coordination; - using namespace DB; - - ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - ResponsesQueue queue(std::numeric_limits::max()); - SnapshotsQueue snapshots_queue{1}; - - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); - state_machine->init(); - - std::shared_ptr request_c = std::make_shared(); - request_c->path = "/hello"; - request_c->is_ephemeral = true; - auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c); - state_machine->pre_commit(1, entry_c->get_buf()); - state_machine->commit(1, entry_c->get_buf()); - const auto & storage = state_machine->getStorageUnsafe(); - - EXPECT_EQ(storage.ephemerals.size(), 1); - std::shared_ptr request_d = std::make_shared(); - request_d->path = "/hello"; - /// Delete from other session - auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d); - state_machine->pre_commit(2, entry_d->get_buf()); - state_machine->commit(2, entry_d->get_buf()); - - EXPECT_EQ(storage.ephemerals.size(), 0); -} - -TEST_F(RocksKeeperTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitted) -{ - using namespace Coordination; - using namespace DB; - - ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - ResponsesQueue queue(std::numeric_limits::max()); - SnapshotsQueue snapshots_queue{1}; - - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); - state_machine->init(); - - String user_auth_data = "test_user:test_password"; - String digest = KeeperRocksStorage::generateDigest(user_auth_data); - - std::shared_ptr auth_req = std::make_shared(); - auth_req->scheme = "digest"; - auth_req->data = user_auth_data; - - // Add auth data to the session - auto auth_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), auth_req); - state_machine->pre_commit(1, auth_entry->get_buf()); - - // Create a node with 'auth' scheme for ACL - String node_path = "/hello"; - std::shared_ptr create_req = std::make_shared(); - create_req->path = node_path; - // When 'auth' scheme is used the creator must have been authenticated by the server (for example, using 'digest' scheme) before it can - // create nodes with this ACL. - create_req->acls = {{.permissions = 31, .scheme = "auth", .id = ""}}; - auto create_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), create_req); - state_machine->pre_commit(2, create_entry->get_buf()); - - const auto & uncommitted_state = state_machine->getStorageUnsafe().uncommitted_state; - ASSERT_TRUE(uncommitted_state.nodes.contains(node_path)); - - // commit log entries - state_machine->commit(1, auth_entry->get_buf()); - state_machine->commit(2, create_entry->get_buf()); - - auto node = uncommitted_state.getNode(node_path); - ASSERT_NE(node, nullptr); - auto acls = uncommitted_state.getACLs(node_path); - ASSERT_EQ(acls.size(), 1); - EXPECT_EQ(acls[0].scheme, "digest"); - EXPECT_EQ(acls[0].id, digest); - EXPECT_EQ(acls[0].permissions, 31); -} - -TEST_F(RocksKeeperTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) -{ - using namespace Coordination; - using namespace DB; - - ChangelogDirTest snapshots("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - ResponsesQueue queue(std::numeric_limits::max()); - SnapshotsQueue snapshots_queue{1}; - - auto state_machine = std::make_shared>(queue, snapshots_queue, keeper_context, nullptr); - state_machine->init(); - - String user_auth_data = "test_user:test_password"; - String digest = KeeperRocksStorage::generateDigest(user_auth_data); - - std::shared_ptr auth_req = std::make_shared(); - auth_req->scheme = "digest"; - auth_req->data = user_auth_data; - - // Add auth data to the session - auto auth_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), auth_req); - state_machine->pre_commit(1, auth_entry->get_buf()); - - // Create a node - String node_path = "/hello"; - std::shared_ptr create_req = std::make_shared(); - create_req->path = node_path; - auto create_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), create_req); - state_machine->pre_commit(2, create_entry->get_buf()); - - // Set ACL with 'auth' scheme for ACL - std::shared_ptr set_acl_req = std::make_shared(); - set_acl_req->path = node_path; - // When 'auth' scheme is used the creator must have been authenticated by the server (for example, using 'digest' scheme) before it can - // set this ACL. - set_acl_req->acls = {{.permissions = 31, .scheme = "auth", .id = ""}}; - auto set_acl_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), set_acl_req); - state_machine->pre_commit(3, set_acl_entry->get_buf()); - - // commit all entries - state_machine->commit(1, auth_entry->get_buf()); - state_machine->commit(2, create_entry->get_buf()); - state_machine->commit(3, set_acl_entry->get_buf()); - - const auto & uncommitted_state = state_machine->getStorageUnsafe().uncommitted_state; - auto node = uncommitted_state.getNode(node_path); - - ASSERT_NE(node, nullptr); - auto acls = uncommitted_state.getACLs(node_path); - ASSERT_EQ(acls.size(), 1); - EXPECT_EQ(acls[0].scheme, "digest"); - EXPECT_EQ(acls[0].id, digest); - EXPECT_EQ(acls[0].permissions, 31); -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotEqual) -{ - ChangelogDirTest test("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setSnapshotDirectory("./snapshots"); - setRocksDBDirectory("./rocksdb"); - - std::optional snapshot_hash; - for (size_t i = 0; i < 15; ++i) - { - DB::KeeperSnapshotManager manager(3, keeper_context); - - DB::KeeperRocksStorage storage(500, "", keeper_context); - addNode(storage, "/hello", ""); - for (size_t j = 0; j < 100; ++j) - { - addNode(storage, "/hello_" + std::to_string(j), "world", 1); - addNode(storage, "/hello/somepath_" + std::to_string(j), "somedata", 3); - } - - storage.session_id_counter = 5; - - storage.ephemerals[3] = {"/hello"}; - storage.ephemerals[1] = {"/hello/somepath"}; - - for (size_t j = 0; j < 3333; ++j) - storage.getSessionID(130 * j); - - DB::KeeperStorageSnapshot snapshot(&storage, storage.zxid); - - auto buf = manager.serializeSnapshotToBuffer(snapshot); - - auto new_hash = sipHash128(reinterpret_cast(buf->data()), buf->size()); - if (!snapshot_hash.has_value()) - { - snapshot_hash = new_hash; - } - else - { - EXPECT_EQ(*snapshot_hash, new_hash); - } - } -} - -TEST_F(RocksKeeperTest, TestStorageSnapshotDifferentCompressions) -{ - ChangelogDirTest test("./snapshots"); - setSnapshotDirectory("./snapshots"); - ChangelogDirTest rocks("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - DB::KeeperSnapshotManager manager(3, keeper_context); - - DB::KeeperRocksStorage storage(500, "", keeper_context); - addNode(storage, "/hello1", "world", 1); - addNode(storage, "/hello2", "somedata", 3); - storage.session_id_counter = 5; - storage.zxid = 2; - storage.ephemerals[3] = {"/hello2"}; - storage.ephemerals[1] = {"/hello1"}; - storage.getSessionID(130); - storage.getSessionID(130); - - DB::KeeperStorageSnapshot snapshot(&storage, 2); - - auto buf = manager.serializeSnapshotToBuffer(snapshot); - manager.serializeSnapshotBufferToDisk(*buf, 2); - EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin.zstd")); - - DB::KeeperSnapshotManager new_manager(3, keeper_context, false); - - auto debuf = new_manager.deserializeSnapshotBufferFromDisk(2); - - auto [restored_storage, snapshot_meta, _] = new_manager.deserializeSnapshotFromBuffer(debuf); - - EXPECT_EQ(restored_storage->container.size(), 6); - EXPECT_EQ(restored_storage->container.find("/")->value.numChildren(), 3); - EXPECT_EQ(restored_storage->container.find("/hello1")->value.numChildren(), 0); - EXPECT_EQ(restored_storage->container.find("/hello2")->value.numChildren(), 0); - - EXPECT_EQ(restored_storage->container.find("/")->value.getData(), ""); - EXPECT_EQ(restored_storage->container.find("/hello1")->value.getData(), "world"); - EXPECT_EQ(restored_storage->container.find("/hello2")->value.getData(), "somedata"); - EXPECT_EQ(restored_storage->session_id_counter, 7); - EXPECT_EQ(restored_storage->zxid, 2); - EXPECT_EQ(restored_storage->ephemerals.size(), 2); - EXPECT_EQ(restored_storage->ephemerals[3].size(), 1); - EXPECT_EQ(restored_storage->ephemerals[1].size(), 1); - EXPECT_EQ(restored_storage->session_and_timeout.size(), 2); -} - -template -ResponseType getSingleResponse(const auto & responses) -{ - EXPECT_FALSE(responses.empty()); - return dynamic_cast(*responses[0].response); -} - -TEST_F(RocksKeeperTest, TestUncommittedStateBasicCrud) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace DB; - using namespace Coordination; - - DB::KeeperRocksStorage storage{500, "", keeper_context}; - - constexpr std::string_view path = "/test"; - - const auto get_committed_data = [&]() -> std::optional - { - auto request = std::make_shared(); - request->path = path; - auto responses = storage.processRequest(request, 0, std::nullopt, true, true); - const auto & get_response = getSingleResponse(responses); - - if (get_response.error != Error::ZOK) - return std::nullopt; - - return get_response.data; - }; - - const auto preprocess_get = [&](int64_t zxid) - { - auto get_request = std::make_shared(); - get_request->path = path; - storage.preprocessRequest(get_request, 0, 0, zxid); - return get_request; - }; - - const auto create_request = std::make_shared(); - create_request->path = path; - create_request->data = "initial_data"; - storage.preprocessRequest(create_request, 0, 0, 1); - storage.preprocessRequest(create_request, 0, 0, 2); - - ASSERT_FALSE(get_committed_data()); - - const auto after_create_get = preprocess_get(3); - - ASSERT_FALSE(get_committed_data()); - - const auto set_request = std::make_shared(); - set_request->path = path; - set_request->data = "new_data"; - storage.preprocessRequest(set_request, 0, 0, 4); - - const auto after_set_get = preprocess_get(5); - - ASSERT_FALSE(get_committed_data()); - - const auto remove_request = std::make_shared(); - remove_request->path = path; - storage.preprocessRequest(remove_request, 0, 0, 6); - storage.preprocessRequest(remove_request, 0, 0, 7); - - const auto after_remove_get = preprocess_get(8); - - ASSERT_FALSE(get_committed_data()); - - { - const auto responses = storage.processRequest(create_request, 0, 1); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(create_request, 0, 2); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZNODEEXISTS); - } - - { - const auto responses = storage.processRequest(after_create_get, 0, 3); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZOK); - ASSERT_EQ(get_response.data, "initial_data"); - } - - ASSERT_EQ(get_committed_data(), "initial_data"); - - { - const auto responses = storage.processRequest(set_request, 0, 4); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(after_set_get, 0, 5); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZOK); - ASSERT_EQ(get_response.data, "new_data"); - } - - ASSERT_EQ(get_committed_data(), "new_data"); - - { - const auto responses = storage.processRequest(remove_request, 0, 6); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(remove_request, 0, 7); - const auto & remove_response = getSingleResponse(responses); - ASSERT_EQ(remove_response.error, Error::ZNONODE); - } - - { - const auto responses = storage.processRequest(after_remove_get, 0, 8); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZNONODE); - } - - ASSERT_FALSE(get_committed_data()); -} - -TEST_F(RocksKeeperTest, TestListRequestTypes) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace DB; - using namespace Coordination; - KeeperRocksStorage storage{500, "", keeper_context}; - - int32_t zxid = 0; - - static constexpr std::string_view test_path = "/list_request_type/node"; - - const auto create_path = [&](const auto & path, bool is_ephemeral, bool is_sequential = true) - { - const auto create_request = std::make_shared(); - int new_zxid = ++zxid; - create_request->path = path; - create_request->is_sequential = is_sequential; - create_request->is_ephemeral = is_ephemeral; - storage.preprocessRequest(create_request, 1, 0, new_zxid); - auto responses = storage.processRequest(create_request, 1, new_zxid); - - EXPECT_GE(responses.size(), 1); - EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; - const auto & create_response = dynamic_cast(*responses[0].response); - return create_response.path_created; - }; - - create_path(parentNodePath(StringRef{test_path}).toString(), false, false); - - static constexpr size_t persistent_num = 5; - std::unordered_set expected_persistent_children; - for (size_t i = 0; i < persistent_num; ++i) - { - expected_persistent_children.insert(getBaseNodeName(create_path(test_path, false)).toString()); - } - ASSERT_EQ(expected_persistent_children.size(), persistent_num); - - static constexpr size_t ephemeral_num = 5; - std::unordered_set expected_ephemeral_children; - for (size_t i = 0; i < ephemeral_num; ++i) - { - expected_ephemeral_children.insert(getBaseNodeName(create_path(test_path, true)).toString()); - } - ASSERT_EQ(expected_ephemeral_children.size(), ephemeral_num); - - const auto get_children = [&](const auto list_request_type) - { - const auto list_request = std::make_shared(); - int new_zxid = ++zxid; - list_request->path = parentNodePath(StringRef{test_path}).toString(); - list_request->list_request_type = list_request_type; - storage.preprocessRequest(list_request, 1, 0, new_zxid); - auto responses = storage.processRequest(list_request, 1, new_zxid); - - EXPECT_GE(responses.size(), 1); - const auto & list_response = dynamic_cast(*responses[0].response); - EXPECT_EQ(list_response.error, Coordination::Error::ZOK); - return list_response.names; - }; - - const auto persistent_children = get_children(ListRequestType::PERSISTENT_ONLY); - EXPECT_EQ(persistent_children.size(), persistent_num); - for (const auto & child : persistent_children) - { - EXPECT_TRUE(expected_persistent_children.contains(child)) << "Missing persistent child " << child; - } - - const auto ephemeral_children = get_children(ListRequestType::EPHEMERAL_ONLY); - EXPECT_EQ(ephemeral_children.size(), ephemeral_num); - for (const auto & child : ephemeral_children) - { - EXPECT_TRUE(expected_ephemeral_children.contains(child)) << "Missing ephemeral child " << child; - } - - const auto all_children = get_children(ListRequestType::ALL); - EXPECT_EQ(all_children.size(), ephemeral_num + persistent_num); - for (const auto & child : all_children) - { - EXPECT_TRUE(expected_ephemeral_children.contains(child) || expected_persistent_children.contains(child)) - << "Missing child " << child; - } -} - -TEST_F(RocksKeeperTest, TestFeatureFlags) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace Coordination; - KeeperMemoryStorage storage{500, "", keeper_context}; - auto request = std::make_shared(); - request->path = DB::keeper_api_feature_flags_path; - auto responses = storage.processRequest(request, 0, std::nullopt, true, true); - const auto & get_response = getSingleResponse(responses); - DB::KeeperFeatureFlags feature_flags; - feature_flags.setFeatureFlags(get_response.data); - ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST)); - ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ)); - ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS)); -} - -TEST_F(RocksKeeperTest, TestSystemNodeModify) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace Coordination; - int64_t zxid{0}; - - // On INIT we abort when a system path is modified - keeper_context->setServerState(KeeperContext::Phase::RUNNING); - KeeperRocksStorage storage{500, "", keeper_context}; - const auto assert_create = [&](const std::string_view path, const auto expected_code) - { - auto request = std::make_shared(); - request->path = path; - storage.preprocessRequest(request, 0, 0, zxid); - auto responses = storage.processRequest(request, 0, zxid); - ASSERT_FALSE(responses.empty()); - - const auto & response = responses[0]; - ASSERT_EQ(response.response->error, expected_code) << "Unexpected error for path " << path; - - ++zxid; - }; - - assert_create("/keeper", Error::ZBADARGUMENTS); - assert_create("/keeper/with_child", Error::ZBADARGUMENTS); - assert_create(DB::keeper_api_version_path, Error::ZBADARGUMENTS); - - assert_create("/keeper_map", Error::ZOK); - assert_create("/keeper1", Error::ZOK); - assert_create("/keepe", Error::ZOK); - assert_create("/keeper1/test", Error::ZOK); -} - -TEST_F(RocksKeeperTest, TestCheckNotExistsRequest) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace DB; - using namespace Coordination; - - KeeperRocksStorage storage{500, "", keeper_context}; - - int32_t zxid = 0; - - const auto create_path = [&](const auto & path) - { - const auto create_request = std::make_shared(); - int new_zxid = ++zxid; - create_request->path = path; - storage.preprocessRequest(create_request, 1, 0, new_zxid); - auto responses = storage.processRequest(create_request, 1, new_zxid); - - EXPECT_GE(responses.size(), 1); - EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; - }; - - const auto check_request = std::make_shared(); - check_request->path = "/test_node"; - check_request->not_exists = true; - - { - SCOPED_TRACE("CheckNotExists returns ZOK"); - int new_zxid = ++zxid; - storage.preprocessRequest(check_request, 1, 0, new_zxid); - auto responses = storage.processRequest(check_request, 1, new_zxid); - EXPECT_GE(responses.size(), 1); - auto error = responses[0].response->error; - EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error); - } - - create_path("/test_node"); - auto node_it = storage.container.find("/test_node"); - ASSERT_NE(node_it, storage.container.end()); - auto node_version = node_it->value.version; - - { - SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS"); - int new_zxid = ++zxid; - storage.preprocessRequest(check_request, 1, 0, new_zxid); - auto responses = storage.processRequest(check_request, 1, new_zxid); - EXPECT_GE(responses.size(), 1); - auto error = responses[0].response->error; - EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error); - } - - { - SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS for same version"); - int new_zxid = ++zxid; - check_request->version = node_version; - storage.preprocessRequest(check_request, 1, 0, new_zxid); - auto responses = storage.processRequest(check_request, 1, new_zxid); - EXPECT_GE(responses.size(), 1); - auto error = responses[0].response->error; - EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error); - } - - { - SCOPED_TRACE("CheckNotExists returns ZOK for different version"); - int new_zxid = ++zxid; - check_request->version = node_version + 1; - storage.preprocessRequest(check_request, 1, 0, new_zxid); - auto responses = storage.processRequest(check_request, 1, new_zxid); - EXPECT_GE(responses.size(), 1); - auto error = responses[0].response->error; - EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error); - } -} - -TEST_F(RocksKeeperTest, TestReapplyingDeltas) -{ - ChangelogDirTest test("./rocksdb"); - setRocksDBDirectory("./rocksdb"); - - using namespace DB; - using namespace Coordination; - - static constexpr int64_t initial_zxid = 100; - - const auto create_request = std::make_shared(); - create_request->path = "/test/data"; - create_request->is_sequential = true; - - const auto process_create = [](KeeperRocksStorage & storage, const auto & request, int64_t zxid) - { - storage.preprocessRequest(request, 1, 0, zxid); - auto responses = storage.processRequest(request, 1, zxid); - EXPECT_GE(responses.size(), 1); - EXPECT_EQ(responses[0].response->error, Error::ZOK); - }; - - const auto commit_initial_data = [&](auto & storage) - { - int64_t zxid = 1; - - const auto root_create = std::make_shared(); - root_create->path = "/test"; - process_create(storage, root_create, zxid); - ++zxid; - - for (; zxid <= initial_zxid; ++zxid) - process_create(storage, create_request, zxid); - }; - - KeeperRocksStorage storage1{500, "", keeper_context}; - commit_initial_data(storage1); - - for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) - storage1.preprocessRequest(create_request, 1, 0, zxid, /*check_acl=*/true, /*digest=*/std::nullopt, /*log_idx=*/zxid); - - /// create identical new storage - KeeperRocksStorage storage2{500, "", keeper_context}; - commit_initial_data(storage2); - - storage1.applyUncommittedState(storage2, initial_zxid); - - const auto commit_unprocessed = [&](KeeperRocksStorage & storage) - { - for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) - { - auto responses = storage.processRequest(create_request, 1, zxid); - EXPECT_GE(responses.size(), 1); - EXPECT_EQ(responses[0].response->error, Error::ZOK); - } - }; - - commit_unprocessed(storage1); - commit_unprocessed(storage2); - - const auto get_children = [&](KeeperRocksStorage & storage) - { - const auto list_request = std::make_shared(); - list_request->path = "/test"; - auto responses = storage.processRequest(list_request, 1, std::nullopt, /*check_acl=*/true, /*is_local=*/true); - EXPECT_EQ(responses.size(), 1); - const auto * list_response = dynamic_cast(responses[0].response.get()); - EXPECT_TRUE(list_response); - return list_response->names; - }; - - auto children1 = get_children(storage1); - std::unordered_set children1_set(children1.begin(), children1.end()); - - auto children2 = get_children(storage2); - std::unordered_set children2_set(children2.begin(), children2.end()); - - ASSERT_TRUE(children1_set == children2_set); -} - -#endif