Merge pull request #34534 from bigo-sg/changelogAsyncDel

Keeper changelog async delete useless logs
This commit is contained in:
alesapin 2022-02-23 10:30:23 +03:00 committed by GitHub
commit 9d21911ca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 1 deletions

View File

@ -293,6 +293,8 @@ Changelog::Changelog(
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir);
clean_log_thread = ThreadFromGlobalPool([this] { cleanLogThread(); });
}
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
@ -581,7 +583,17 @@ void Changelog::compact(uint64_t up_to_log_index)
}
LOG_INFO(log, "Removing changelog {} because of compaction", itr->second.path);
std::filesystem::remove(itr->second.path);
/// If failed to push to queue for background removing, then we will remove it now
if (!log_files_to_delete_queue.tryPush(itr->second.path, 1))
{
std::error_code ec;
std::filesystem::remove(itr->second.path, ec);
if (ec)
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", itr->second.path, ec.message());
else
LOG_INFO(log, "Removed changelog {} because of compaction", itr->second.path);
}
itr = existing_changelogs.erase(itr);
}
else /// Files are ordered, so all subsequent should exist
@ -705,6 +717,9 @@ Changelog::~Changelog()
try
{
flush();
log_files_to_delete_queue.finish();
if (clean_log_thread.joinable())
clean_log_thread.join();
}
catch (...)
{
@ -712,4 +727,20 @@ Changelog::~Changelog()
}
}
void Changelog::cleanLogThread()
{
while (!log_files_to_delete_queue.isFinishedAndEmpty())
{
std::string path;
if (log_files_to_delete_queue.tryPop(path))
{
std::error_code ec;
if (std::filesystem::remove(path, ec))
LOG_INFO(log, "Removed changelog {} because of compaction.", path);
else
LOG_WARNING(log, "Failed to remove changelog {} in compaction, error message: {}", path, ec.message());
}
}
}
}

View File

@ -7,6 +7,7 @@
#include <IO/HashingWriteBuffer.h>
#include <IO/CompressionMethod.h>
#include <Disks/IDisk.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace DB
{
@ -142,6 +143,9 @@ private:
/// Init writer for existing log with some entries already written
void initWriter(const ChangelogFileDescription & description);
/// Clean useless log files in a background thread
void cleanLogThread();
private:
const std::string changelogs_dir;
const uint64_t rotate_interval;
@ -160,6 +164,10 @@ private:
/// min_log_id + 1 == max_log_id means empty log storage for NuRaft
uint64_t min_log_id = 0;
uint64_t max_log_id = 0;
/// For compaction, queue of delete not used logs
/// 128 is enough, even if log is not removed, it's not a problem
ConcurrentBoundedQueue<std::string> log_files_to_delete_queue{128};
ThreadFromGlobalPool clean_log_thread;
};
}

View File

@ -1,3 +1,4 @@
#include <chrono>
#include <gtest/gtest.h>
#include "config_core.h"
@ -406,6 +407,7 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
changelog.compact(6);
std::this_thread::sleep_for(std::chrono::microseconds(200));
EXPECT_FALSE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
@ -1462,6 +1464,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
}
changelog_2.compact(105);
std::this_thread::sleep_for(std::chrono::microseconds(200));
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
@ -1481,6 +1484,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
}
changelog_3.compact(125);
std::this_thread::sleep_for(std::chrono::microseconds(200));
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_111_117.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_118_124.bin" + params.extension));