Merge pull request #37565 from ClickHouse/keeper-broken-to-detached

Move clickhouse-keeper broken logs to specific folder
This commit is contained in:
Antonio Andelic 2022-06-14 08:23:17 +02:00 committed by GitHub
commit 71e4bd0689
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 36 deletions

View File

@ -36,21 +36,20 @@ std::string formatChangelogPath(const std::string & prefix, const ChangelogFileD
return path;
}
ChangelogFileDescription getChangelogFileDescription(const std::string & path_str)
ChangelogFileDescription getChangelogFileDescription(const std::filesystem::path & path)
{
std::filesystem::path path(path_str);
std::string filename = path.stem();
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
if (filename_parts.size() < 3)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path_str);
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path.generic_string());
ChangelogFileDescription result;
result.prefix = filename_parts[0];
result.from_log_index = parse<uint64_t>(filename_parts[1]);
result.to_log_index = parse<uint64_t>(filename_parts[2]);
result.extension = path.extension();
result.path = path_str;
result.path = path.generic_string();
return result;
}
@ -276,6 +275,7 @@ Changelog::Changelog(
Poco::Logger * log_,
bool compress_logs_)
: changelogs_dir(changelogs_dir_)
, changelogs_detached_dir(changelogs_dir / "detached")
, rotate_interval(rotate_interval_)
, force_sync(force_sync_)
, log(log_)
@ -288,12 +288,15 @@ Changelog::Changelog(
for (const auto & p : fs::directory_iterator(changelogs_dir))
{
if (p == changelogs_detached_dir)
continue;
auto file_description = getChangelogFileDescription(p.path());
existing_changelogs[file_description.from_log_index] = file_description;
}
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir);
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir.generic_string());
clean_log_thread = ThreadFromGlobalPool([this] { cleanLogThread(); });
}
@ -328,7 +331,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// entries from leader.
if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1)
{
LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
LOG_ERROR(log, "Some records were lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
/// Nothing to do with our more fresh log, leader will overwrite them, so remove everything and just start from last_commited_index
removeAllLogs();
min_log_id = last_commited_log_index;
@ -342,6 +345,12 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index);
}
}
else if ((changelog_description.from_log_index - last_log_read_result->last_read_index) > 1)
{
LOG_ERROR(log, "Some records were lost, last found log index {}, while the next log index on disk is {}. Hopefully will receive missing records from leader.", last_log_read_result->last_read_index, changelog_description.from_log_index);
removeAllLogsAfter(last_log_read_result->log_start_index);
break;
}
ChangelogReader reader(changelog_description.path);
last_log_read_result = reader.readChangelog(logs, start_to_read_from, log);
@ -431,6 +440,44 @@ void Changelog::initWriter(const ChangelogFileDescription & description)
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
}
namespace
{
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
}
void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
{
const auto timestamp_folder = changelogs_detached_dir / getCurrentTimestampFolder();
for (auto itr = begin; itr != end;)
{
if (!std::filesystem::exists(timestamp_folder))
{
LOG_WARNING(log, "Moving broken logs to {}", timestamp_folder.generic_string());
std::filesystem::create_directories(timestamp_folder);
}
LOG_WARNING(log, "Removing changelog {}", itr->second.path);
const std::filesystem::path path = itr->second.path;
const auto new_path = timestamp_folder / path.filename();
std::filesystem::rename(path, new_path);
itr = existing_changelogs.erase(itr);
}
}
void Changelog::removeAllLogsAfter(uint64_t remove_after_log_start_index)
{
auto start_to_remove_from_itr = existing_changelogs.upper_bound(remove_after_log_start_index);
@ -440,12 +487,8 @@ void Changelog::removeAllLogsAfter(uint64_t remove_after_log_start_index)
size_t start_to_remove_from_log_id = start_to_remove_from_itr->first;
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_to_remove_from_itr; itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
LOG_WARNING(log, "Removing changelogs that go after broken changelog entry");
removeExistingLogs(start_to_remove_from_itr, existing_changelogs.end());
std::erase_if(logs, [start_to_remove_from_log_id] (const auto & item) { return item.first >= start_to_remove_from_log_id; });
}
@ -453,12 +496,7 @@ void Changelog::removeAllLogsAfter(uint64_t remove_after_log_start_index)
void Changelog::removeAllLogs()
{
LOG_WARNING(log, "Removing all changelogs");
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
removeExistingLogs(existing_changelogs.begin(), existing_changelogs.end());
logs.clear();
}

View File

@ -138,6 +138,13 @@ private:
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(uint64_t new_start_log_index);
/// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
using ChangelogIter = decltype(existing_changelogs)::iterator;
void removeExistingLogs(ChangelogIter begin, ChangelogIter end);
static void removeLog(const std::filesystem::path & path, const std::filesystem::path & detached_folder);
/// Remove all changelogs from disk with start_index bigger than start_to_remove_from_id
void removeAllLogsAfter(uint64_t remove_after_log_start_index);
/// Remove all logs from disk
@ -148,14 +155,13 @@ private:
/// Clean useless log files in a background thread
void cleanLogThread();
const std::string changelogs_dir;
const std::filesystem::path changelogs_dir;
const std::filesystem::path changelogs_detached_dir;
const uint64_t rotate_interval;
const bool force_sync;
Poco::Logger * log;
bool compress_logs;
/// Currently existing changelogs
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
/// Current writer for changelog file
std::unique_ptr<ChangelogWriter> current_writer;

View File

@ -698,13 +698,32 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin" + params.extension));
}
namespace
{
void assertBrokenLogRemoved(const fs::path & log_folder, const fs::path & filename)
{
EXPECT_FALSE(fs::exists(log_folder / filename));
// broken logs are sent to the detached/{timestamp} folder
// we don't know timestamp so we iterate all of them
for (const auto & dir_entry : fs::recursive_directory_iterator(log_folder / "detached"))
{
if (dir_entry.path().filename() == filename)
return;
}
FAIL() << "Broken log " << filename << " was not moved to the detached folder";
}
}
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
static const fs::path log_folder{"./logs"};
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
auto params = GetParam();
ChangelogDirTest test(log_folder);
DB::KeeperLogStore changelog(log_folder, 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -736,10 +755,10 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
assertBrokenLogRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_31_35.bin" + params.extension);
auto entry = getLogEntry("h", 7777);
changelog_reader.append(entry);
@ -751,10 +770,10 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
assertBrokenLogRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_31_35.bin" + params.extension);
DB::KeeperLogStore changelog_reader2("./logs", 5, true, params.enable_compression);
changelog_reader2.init(1, 0);
@ -788,14 +807,13 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader.size(), 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 1);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
DB::KeeperLogStore changelog_reader2("./logs", 1, true, params.enable_compression);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 1);
@ -825,10 +843,40 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles)
DB::KeeperLogStore changelog_reader("./logs", 20, true, params.enable_compression);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
}
TEST_P(CoordinationTest, ChangelogTestLostFiles2)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 10, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_40.bin" + params.extension));
// we have a gap in our logs, we need to remove all the logs after the gap
fs::remove("./logs/changelog_21_30.bin" + params.extension);
DB::KeeperLogStore changelog_reader("./logs", 10, true, params.enable_compression);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_31_40.bin" + params.extension);
}
struct IntNode
{
int value;