diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index b0a786665b4..cadb86044ad 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -284,11 +284,18 @@ void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_ { if (!started) { - if (changelog_description.from_log_index > start_to_read_from) - throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot read changelog from index {}, smallest available index {}", start_index, changelog_description.from_log_index); - started = true; + if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1) + { + LOG_ERROR(log, "Some records was lost, last commited log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index); + incomplete_log_index = changelog_start_index; + break; + } + else if (changelog_description.from_log_index > start_to_read_from) + LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smalled available log index on disk {}.", start_to_read_from, changelog_description.from_log_index); } + started = true; + ChangelogReader reader(changelog_description.path); result = reader.readChangelog(logs, start_to_read_from, index_to_start_pos, log); if (first_read_index == 0) @@ -312,8 +319,12 @@ void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_ if (incomplete_log_index != 0) { + auto start_remove_from = existing_changelogs.begin(); + if (started) + start_remove_from = existing_changelogs.upper_bound(incomplete_log_index); + /// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them. - for (auto itr = existing_changelogs.upper_bound(incomplete_log_index); itr != existing_changelogs.end();) + for (auto itr = start_remove_from; itr != existing_changelogs.end();) { LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path); std::filesystem::remove(itr->second.path); diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 8cc65c001be..126ff7b8533 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -764,10 +764,10 @@ TEST(CoordinationTest, ChangelogTestLostFiles) fs::remove("./logs/changelog_1_20.bin"); DB::NuKeeperLogStore changelog_reader("./logs", 20, true); - EXPECT_THROW(changelog_reader.init(5, 0), DB::Exception); - - fs::remove("./logs/changelog_21_40.bin"); - EXPECT_THROW(changelog_reader.init(3, 0), DB::Exception); + /// It should print error message, but still able to start + changelog_reader.init(5, 0); + EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin")); } TEST(CoordinationTest, SnapshotableHashMapSimple)