From 3f5ccabba60de0181319822292664452393a059c Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Nov 2023 08:32:37 +0000 Subject: [PATCH 1/4] Don't append different log types in same file --- src/Coordination/Changelog.cpp | 12 ++++++++++-- src/Coordination/Changelog.h | 1 + 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 3c2004a1b75..11f73da63d9 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -14,6 +14,7 @@ #include #include #include +#include "IO/CompressionMethod.h" #include @@ -476,6 +477,9 @@ struct ChangelogReadResult /// last offset we were able to read from log off_t last_position; + + /// Whether the changelog file was written using compression + bool compressed_log; bool error; }; @@ -484,7 +488,7 @@ class ChangelogReader public: explicit ChangelogReader(DiskPtr disk_, const std::string & filepath_) : disk(disk_), filepath(filepath_) { - auto compression_method = chooseCompressionMethod(filepath, ""); + compression_method = chooseCompressionMethod(filepath, ""); auto read_buffer_from_file = disk->readFile(filepath); read_buf = wrapReadBufferWithCompressionMethod(std::move(read_buffer_from_file), compression_method); } @@ -493,6 +497,7 @@ public: ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, Poco::Logger * log) { ChangelogReadResult result{}; + result.compressed_log = compression_method != CompressionMethod::None; try { while (!read_buf->eof()) @@ -583,6 +588,7 @@ public: private: DiskPtr disk; std::string filepath; + CompressionMethod compression_method; std::unique_ptr read_buf; }; @@ -590,6 +596,7 @@ Changelog::Changelog( Poco::Logger * log_, LogFileSettings log_file_settings, FlushSettings flush_settings_, KeeperContextPtr keeper_context_) : changelogs_detached_dir("detached") , rotate_interval(log_file_settings.rotate_interval) + , compress_logs(log_file_settings.compress_logs) , log(log_) , write_operations(std::numeric_limits::max()) , append_completion_queue(std::numeric_limits::max()) @@ -830,7 +837,8 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin existing_changelogs.erase(last_log_read_result->log_start_index); std::erase_if(logs, [last_log_read_result](const auto & item) { return item.first >= last_log_read_result->log_start_index; }); } - else + /// don't mix compressed and uncompressed writes + else if (compress_logs == last_log_read_result->compressed_log) { initWriter(description); } diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index dddcb9aa218..68d8c810823 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -182,6 +182,7 @@ private: const String changelogs_detached_dir; const uint64_t rotate_interval; + const bool compress_logs; Poco::Logger * log; std::mutex writer_mutex; From 1ab7e5693de914f6147e701788d023315c2d6de8 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Nov 2023 09:43:09 +0000 Subject: [PATCH 2/4] Add unit test --- src/Coordination/CoordinationSettings.h | 2 +- src/Coordination/tests/gtest_coordination.cpp | 96 +++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 3cbfa3e449d..a58f2b04797 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -45,7 +45,7 @@ struct Settings; M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \ M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \ - M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \ + M(Bool, compress_logs, false, "Write compressed coordination logs in ZSTD format", 0) \ M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \ M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \ M(UInt64, max_log_file_size, 50 * 1024 * 1024, "Max size of the Raft log file. If possible, each created log file will preallocate this amount of bytes on disk. Set to 0 to disable the limit", 0) \ diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 418b5225fa4..3658df12f69 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1102,6 +1102,102 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); } +TEST_F(CoordinationTest, ChangelogTestMixedLogTypes) +{ + ChangelogDirTest test("./logs"); + setLogDirectory("./logs"); + + std::vector changelog_files; + + const auto verify_changelog_files = [&] + { + for (const auto & log_file : changelog_files) + EXPECT_TRUE(fs::exists(log_file)) << "File " << log_file << " not found"; + }; + + size_t last_term = 0; + size_t log_size = 0; + + const auto append_log = [&](auto & changelog, const std::string & data, uint64_t term) + { + last_term = term; + ++log_size; + auto entry = getLogEntry(data, last_term); + changelog.append(entry); + }; + + const auto verify_log_content = [&](const auto & changelog) + { + EXPECT_EQ(changelog.size(), log_size); + EXPECT_EQ(changelog.last_entry()->get_term(), last_term); + }; + + { + SCOPED_TRACE("Initial uncompressed log"); + DB::KeeperLogStore changelog( + DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, + DB::FlushSettings(), + keeper_context); + changelog.init(1, 0); + + for (size_t i = 0; i < 35; ++i) + append_log(changelog, std::to_string(i) + "_hello_world", (i+ 44) * 10); + + changelog.end_of_append_batch(0, 0); + + waitDurableLogs(changelog); + changelog_files.push_back("./logs/changelog_1_20.bin"); + changelog_files.push_back("./logs/changelog_21_40.bin"); + verify_changelog_files(); + + verify_log_content(changelog); + } + + { + SCOPED_TRACE("Compressed log"); + DB::KeeperLogStore changelog_compressed( + DB::LogFileSettings{.force_sync = true, .compress_logs = true, .rotate_interval = 20}, + DB::FlushSettings(), + keeper_context); + changelog_compressed.init(1, 0); + + verify_changelog_files(); + verify_log_content(changelog_compressed); + + append_log(changelog_compressed, "hello_world", 7777); + changelog_compressed.end_of_append_batch(0, 0); + + waitDurableLogs(changelog_compressed); + + verify_log_content(changelog_compressed); + + changelog_files.push_back("./logs/changelog_36_55.bin.zstd"); + verify_changelog_files(); + } + + { + SCOPED_TRACE("Final uncompressed log"); + DB::KeeperLogStore changelog( + DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, + DB::FlushSettings(), + keeper_context); + changelog.init(1, 0); + + verify_changelog_files(); + verify_log_content(changelog); + + append_log(changelog, "hello_world", 7778); + changelog.end_of_append_batch(0, 0); + + waitDurableLogs(changelog); + + verify_log_content(changelog); + + changelog_files.push_back("./logs/changelog_37_56.bin"); + verify_changelog_files(); + } +} + TEST_P(CoordinationTest, ChangelogTestLostFiles) { auto params = GetParam(); From 03c3e968595512558f2cd4a3fddeec0ede75990c Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Nov 2023 17:32:34 +0000 Subject: [PATCH 3/4] Small test changes --- src/Coordination/Changelog.cpp | 1 - src/Coordination/tests/gtest_coordination.cpp | 27 ++++++++----------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 933eda485dd..c28cc368ac0 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -14,7 +14,6 @@ #include #include #include -#include "IO/CompressionMethod.h" #include diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 1dc4ae1382c..2b5fd3424c0 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1104,20 +1104,15 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) } /// Truncating only some entries from the end -TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) +/// For compressed logs we have no reliable way of knowing how many log entries were lost +/// after we truncate some bytes from the end +TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) { - auto params = GetParam(); - - /// For compressed logs we have no reliable way of knowing how many log entries were lost - /// after we truncate some bytes from the end - if (!params.extension.empty()) - return; - ChangelogDirTest test("./logs"); setLogDirectory("./logs"); DB::KeeperLogStore changelog( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), keeper_context); changelog.init(1, 0); @@ -1131,23 +1126,23 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3) changelog.end_of_append_batch(0, 0); waitDurableLogs(changelog); - EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); - EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin")); DB::WriteBufferFromFile plain_buf( - "./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + "./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(plain_buf.size() - 30); DB::KeeperLogStore changelog_reader( - DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 20}, + DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20}, DB::FlushSettings(), keeper_context); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 19); - EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension)); - assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension); - EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin" + params.extension)); + EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin")); + assertBrokenLogRemoved("./logs", "changelog_21_40.bin"); + EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin")); auto entry = getLogEntry("hello_world", 7777); changelog_reader.append(entry); changelog_reader.end_of_append_batch(0, 0); From 5309dc05ef44da063ad095b91776dbfb3e849fb0 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 15 Nov 2023 07:42:19 +0000 Subject: [PATCH 4/4] Fix test --- tests/integration/test_keeper_four_word_command/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_keeper_four_word_command/test.py b/tests/integration/test_keeper_four_word_command/test.py index 71501133ae7..84dd2a2fd93 100644 --- a/tests/integration/test_keeper_four_word_command/test.py +++ b/tests/integration/test_keeper_four_word_command/test.py @@ -287,7 +287,7 @@ def test_cmd_conf(started_cluster): assert result["quorum_reads"] == "false" assert result["force_sync"] == "true" - assert result["compress_logs"] == "true" + assert result["compress_logs"] == "false" assert result["compress_snapshots_with_zstd_format"] == "true" assert result["configuration_change_tries_count"] == "20"