diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 4f095974836..9e1ed557430 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -39,21 +39,15 @@ namespace static constexpr auto DEFAULT_PREFIX = "changelog"; -struct ChangelogName -{ - std::string prefix; - size_t from_log_idx; - size_t to_log_idx; -}; - -std::string formatChangelogPath(const std::string & prefix, const ChangelogName & name) +std::string formatChangelogPath(const std::string & prefix, const ChangelogFileDescription & name) { std::filesystem::path path(prefix); path /= std::filesystem::path(name.prefix + "_" + std::to_string(name.from_log_idx) + "_" + std::to_string(name.to_log_idx) + ".bin"); return path; } -ChangelogName getChangelogName(const std::string & path_str) + +ChangelogFileDescription getChangelogFileDescription(const std::string & path_str) { std::filesystem::path path(path_str); std::string filename = path.stem(); @@ -62,10 +56,11 @@ ChangelogName getChangelogName(const std::string & path_str) if (filename_parts.size() < 3) throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path_str); - ChangelogName result; + ChangelogFileDescription result; result.prefix = filename_parts[0]; result.from_log_idx = parse(filename_parts[1]); result.to_log_idx = parse(filename_parts[2]); + result.path = path_str; return result; } @@ -204,8 +199,8 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval for (const auto & p : fs::directory_iterator(changelogs_dir)) { - auto name = getChangelogName(p.path()); - existing_changelogs[name.from_log_idx] = p.path(); + auto file_description = getChangelogFileDescription(p.path()); + existing_changelogs[file_description.from_log_idx] = file_description; } } @@ -214,22 +209,40 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx) size_t read_from_last = 0; start_index = from_log_idx == 0 ? 1 : from_log_idx; size_t total_read = 0; - for (const auto & [start_id, changelog_file] : existing_changelogs) + size_t entries_in_last = 0; + size_t incomplete_log_idx = 0; + for (const auto & [start_idx, changelog_description] : existing_changelogs) { - ChangelogName parsed_name = getChangelogName(changelog_file); - if (parsed_name.to_log_idx >= from_log_idx) + entries_in_last = changelog_description.to_log_idx - changelog_description.from_log_idx + 1; + + if (changelog_description.to_log_idx >= from_log_idx) { - ChangelogReader reader(changelog_file); + ChangelogReader reader(changelog_description.path); read_from_last = reader.readChangelog(logs, from_log_idx, index_to_start_pos); total_read += read_from_last; + + /// May happen after truncate and crash + if (read_from_last < entries_in_last) + { + incomplete_log_idx = start_idx; + break; + } } } - if (existing_changelogs.size() > 0 && read_from_last < rotate_interval) + if (incomplete_log_idx != 0) { - auto str_name = existing_changelogs.rbegin()->second; - auto parsed_name = getChangelogName(str_name); - current_writer = std::make_unique(str_name, WriteMode::Append, parsed_name.from_log_idx); + for (auto itr = existing_changelogs.upper_bound(incomplete_log_idx); itr != existing_changelogs.end();) + { + std::filesystem::remove(itr->second.path); + itr = existing_changelogs.erase(itr); + } + } + + if (existing_changelogs.size() > 0 && read_from_last < entries_in_last) + { + auto description = existing_changelogs.rbegin()->second; + current_writer = std::make_unique(description.path, WriteMode::Append, description.from_log_idx); current_writer->setEntriesWritten(read_from_last); } else @@ -243,14 +256,14 @@ void Changelog::rotate(size_t new_start_log_idx) if (current_writer) current_writer->flush(); - ChangelogName new_name; - new_name.prefix = DEFAULT_PREFIX; - new_name.from_log_idx = new_start_log_idx; - new_name.to_log_idx = new_start_log_idx + rotate_interval - 1; + ChangelogFileDescription new_description; + new_description.prefix = DEFAULT_PREFIX; + new_description.from_log_idx = new_start_log_idx; + new_description.to_log_idx = new_start_log_idx + rotate_interval - 1; - auto new_log_path = formatChangelogPath(changelogs_dir, new_name); - existing_changelogs[new_start_log_idx] = new_log_path; - current_writer = std::make_unique(new_log_path, WriteMode::Rewrite, new_start_log_idx); + new_description.path = formatChangelogPath(changelogs_dir, new_description); + existing_changelogs[new_start_log_idx] = new_description; + current_writer = std::make_unique(new_description.path, WriteMode::Rewrite, new_start_log_idx); } ChangelogRecord Changelog::buildRecord(size_t index, nuraft::ptr log_entry) const @@ -301,15 +314,14 @@ void Changelog::writeAt(size_t index, nuraft::ptr log_entry) if (need_rollback) { auto index_changelog = existing_changelogs.lower_bound(index); - std::string fname; + ChangelogFileDescription description; if (index_changelog->first == index) - fname = index_changelog->second; + description = index_changelog->second; else - fname = std::prev(index_changelog)->second; + description = std::prev(index_changelog)->second; - current_writer = std::make_unique(fname, WriteMode::Append, index_changelog->first); - auto formated_name = getChangelogName(fname); - current_writer->setEntriesWritten(formated_name.to_log_idx - formated_name.from_log_idx + 1); + current_writer = std::make_unique(description.path, WriteMode::Append, index_changelog->first); + current_writer->setEntriesWritten(description.to_log_idx - description.from_log_idx + 1); } auto entries_written = current_writer->getEntriesWritten(); @@ -320,7 +332,7 @@ void Changelog::writeAt(size_t index, nuraft::ptr log_entry) auto to_remove_itr = existing_changelogs.upper_bound(index); for (auto itr = to_remove_itr; itr != existing_changelogs.end();) { - std::filesystem::remove(itr->second); + std::filesystem::remove(itr->second.path); itr = existing_changelogs.erase(itr); } } @@ -342,17 +354,16 @@ void Changelog::compact(size_t up_to_log_idx) { for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();) { - ChangelogName parsed_name = getChangelogName(itr->second); - if (parsed_name.to_log_idx <= up_to_log_idx) + if (itr->second.to_log_idx <= up_to_log_idx) { - for (size_t idx = parsed_name.from_log_idx; idx <= parsed_name.to_log_idx; ++idx) + for (size_t idx = itr->second.from_log_idx; idx <= itr->second.to_log_idx; ++idx) { auto index_pos = index_to_start_pos.find(idx); if (index_pos == index_to_start_pos.end()) break; index_to_start_pos.erase(index_pos); } - std::filesystem::remove(itr->second); + std::filesystem::remove(itr->second.path); itr = existing_changelogs.erase(itr); } else @@ -366,7 +377,6 @@ void Changelog::compact(size_t up_to_log_idx) LogEntryPtr Changelog::getLastEntry() const { - static LogEntryPtr fake_entry = nuraft::cs_new(0, nuraft::buffer::alloc(sizeof(size_t))); size_t next_idx = getNextEntryIndex() - 1; diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index 7c352e7a91b..e154c1c70c6 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -45,6 +45,15 @@ struct ChangelogRecord nuraft::ptr blob; }; +struct ChangelogFileDescription +{ + std::string prefix; + size_t from_log_idx; + size_t to_log_idx; + + std::string path; +}; + class ChangelogWriter; class Changelog @@ -98,7 +107,7 @@ private: private: std::string changelogs_dir; - std::map existing_changelogs; + std::map existing_changelogs; std::unique_ptr current_writer; IndexToOffset index_to_start_pos; const size_t rotate_interval; diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 8328d93d9cf..76dd08a6d33 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -36,7 +36,9 @@ struct ChangelogDirTest , drop(drop_) { if (fs::exists(path)) + { EXPECT_TRUE(false) << "Path " << path << " already exists, remove it to run test"; + } fs::create_directory(path); } @@ -810,4 +812,59 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead) EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin")); } + +TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) +{ + ChangelogDirTest test("./logs"); + + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); + + for (size_t i = 0; i < 35; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); + changelog.append(entry); + } + EXPECT_EQ(changelog.size(), 35); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin")); + + DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + plain_buf.truncate(0); + + DB::NuKeeperLogStore changelog_reader("./logs", 5); + changelog_reader.init(1); + + EXPECT_EQ(changelog_reader.size(), 10); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90); + + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); + + auto entry = getLogEntry("h", 7777); + changelog_reader.append(entry); + EXPECT_EQ(changelog_reader.size(), 11); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777); + + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); +} + #endif