ClickHouse/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp

241 lines
6.9 KiB
C++
Raw Normal View History

2021-03-31 15:20:30 +00:00
#include <Storages/MergeTree/MergeTreeDeduplicationLog.h>
#include <filesystem>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
2021-04-02 12:37:42 +00:00
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
2021-03-31 15:20:30 +00:00
namespace DB
{
namespace
{
2021-04-02 11:46:42 +00:00
enum class MergeTreeDeduplicationOp : uint8_t
{
ADD = 1,
DROP = 2,
};
struct MergeTreeDeduplicationLogRecord
{
MergeTreeDeduplicationOp operation;
std::string part_name;
std::string block_id;
};
void writeRecord(const MergeTreeDeduplicationLogRecord & record, WriteBuffer & out)
{
writeIntText(static_cast<uint8_t>(record.operation), out);
writeChar(' ', out);
2021-04-02 12:37:42 +00:00
writeStringBinary(record.part_name, out);
2021-04-02 11:46:42 +00:00
writeChar(' ', out);
2021-04-02 12:37:42 +00:00
writeStringBinary(record.block_id, out);
2021-04-02 11:46:42 +00:00
writeChar('\n', out);
}
void readRecord(MergeTreeDeduplicationLogRecord & record, ReadBuffer & in)
{
uint8_t op;
readIntText(op, in);
record.operation = static_cast<MergeTreeDeduplicationOp>(op);
assertChar(' ', in);
readStringBinary(record.part_name, in);
assertChar(' ', in);
readStringBinary(record.block_id, in);
assertChar('\n', in);
}
2021-03-31 15:20:30 +00:00
std::string getLogPath(const std::string & prefix, size_t number)
{
std::filesystem::path path(prefix);
path /= std::filesystem::path(std::string{"deduplication_log_"} + std::to_string(number) + ".txt");
return path;
}
size_t getLogNumber(const std::string & path_str)
{
std::filesystem::path path(path_str);
std::string filename = path.stem();
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
return parse<size_t>(filename_parts[2]);
}
}
MergeTreeDeduplicationLog::MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
2021-04-02 11:46:42 +00:00
const MergeTreeDataFormatVersion & format_version_)
2021-03-31 15:20:30 +00:00
: logs_dir(logs_dir_)
, deduplication_window(deduplication_window_)
2021-04-02 11:46:42 +00:00
, rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter
, format_version(format_version_)
, deduplication_map(deduplication_window)
2021-03-31 15:20:30 +00:00
{}
void MergeTreeDeduplicationLog::load()
{
namespace fs = std::filesystem;
if (!fs::exists(logs_dir))
fs::create_directories(logs_dir);
for (const auto & p : fs::directory_iterator(logs_dir))
{
auto path = p.path();
auto log_number = getLogNumber(path);
2021-04-02 11:46:42 +00:00
existing_logs[log_number] = {path, 0};
2021-03-31 15:20:30 +00:00
}
2021-04-02 11:46:42 +00:00
/// Order important
for (auto & [log_number, desc] : existing_logs)
{
desc.entries_count = loadSingleLog(desc.path);
current_log_number = log_number;
}
rotateAndDropIfNeeded();
2021-03-31 15:20:30 +00:00
}
2021-04-02 11:46:42 +00:00
size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
2021-03-31 15:20:30 +00:00
{
ReadBufferFromFile read_buf(path);
2021-04-02 11:46:42 +00:00
size_t total_entries = 0;
2021-03-31 15:20:30 +00:00
while (!read_buf.eof())
{
2021-04-02 11:46:42 +00:00
MergeTreeDeduplicationLogRecord record;
readRecord(record, read_buf);
if (record.operation == MergeTreeDeduplicationOp::DROP)
deduplication_map.erase(record.block_id);
else
deduplication_map.insert(record.block_id, MergeTreePartInfo::fromPartName(record.part_name, format_version));
total_entries++;
2021-03-31 15:20:30 +00:00
}
2021-04-02 11:46:42 +00:00
return total_entries;
2021-03-31 15:20:30 +00:00
}
void MergeTreeDeduplicationLog::rotate()
{
2021-04-02 11:46:42 +00:00
current_log_number++;
auto new_path = getLogPath(logs_dir, current_log_number);
MergeTreeDeduplicationLogNameDescription log_description{new_path, 0};
existing_logs.emplace(current_log_number, log_description);
if (current_writer)
current_writer->sync();
current_writer = std::make_unique<WriteBufferFromFile>(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
void MergeTreeDeduplicationLog::dropOutdatedLogs()
{
size_t current_sum = 0;
2021-04-02 12:37:42 +00:00
size_t remove_from_value = 0;
for (auto itr = existing_logs.rbegin(); itr != existing_logs.rend(); ++itr)
2021-04-02 11:46:42 +00:00
{
auto & description = itr->second;
if (current_sum > deduplication_window)
{
2021-04-02 12:37:42 +00:00
//std::filesystem::remove(description.path);
//itr = existing_logs.erase(itr);
remove_from_value = itr->first;
break;
2021-04-02 11:46:42 +00:00
}
2021-04-02 12:37:42 +00:00
current_sum += description.entries_count;
++itr;
}
if (remove_from_value != 0)
{
for (auto itr = existing_logs.begin(); itr != existing_logs.end();)
2021-04-02 11:46:42 +00:00
{
}
}
2021-04-02 12:37:42 +00:00
2021-04-02 11:46:42 +00:00
}
void MergeTreeDeduplicationLog::rotateAndDropIfNeeded()
{
if (existing_logs.empty() || existing_logs[current_log_number].entries_count > rotate_interval)
{
rotate();
dropOutdatedLogs();
}
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreeData::MutableDataPartPtr & part)
{
std::lock_guard lock(state_mutex);
2021-03-31 15:20:30 +00:00
2021-04-02 11:46:42 +00:00
if (deduplication_map.contains(block_id))
return std::make_pair(deduplication_map->get(block_id), false);
2021-03-31 15:20:30 +00:00
2021-04-02 11:46:42 +00:00
assert(current_writer != nullptr);
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::ADD;
record.part_name = part->name;
record.block_id = block_id;
writeRecord(record, *current_writer);
existing_logs[current_log_number].entries_count++;
deduplication_map.insert(record.block_id, part->info);
return std::make_pair(part->info, true);
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const MergeTreeData::MutableDataPartPtr & part)
{
return addPart(part->getZeroLevelPartBlockID(), part);
}
void MergeTreeDeduplicationLog::dropPart(const MergeTreeData::DataPartPtr & part)
{
std::lock_guard lock(state_mutex);
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); ++itr)
{
const auto & part_info = itr->second;
if (part.contains(part_info))
{
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->first;
writeRecord(record, *current_writer);
existing_logs[current_log_number].entries_count++;
deduplication_map.erase(itr->first);
rotateAndDropIfNeeded();
}
}
}
void MergeTreeDeduplicationLog::dropPartition(const std::string & partition_id)
{
std::lock_guard lock(state_mutex);
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); ++itr)
{
const auto & part_info = itr->second;
if (part_info.partition_id == partition_id)
{
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->first;
writeRecord(record, *current_writer);
deduplication_map.erase(itr->first);
existing_logs[current_log_number].entries_count++;
rotateAndDropIfNeeded();
}
}
2021-03-31 15:20:30 +00:00
}
}