diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 7c9f7b8104d..d59d877b372 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1349,6 +1349,24 @@ String IMergeTreeDataPart::getUniqueId() const return id; } + +String IMergeTreeDataPart::getZeroLevelPartBlockID() const +{ + if (info.level != 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name); + + SipHash hash; + checksums.computeTotalChecksumDataOnly(hash); + union + { + char bytes[16]; + UInt64 words[2]; + } hash_value; + hash.get128(hash_value.bytes); + + return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); +} + bool isCompactPart(const MergeTreeDataPartPtr & data_part) { return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT); @@ -1364,5 +1382,6 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part) return (data_part && data_part->getType() == MergeTreeDataPartType::IN_MEMORY); } + } diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp new file mode 100644 index 00000000000..7c6f6f6db9a --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + + + +namespace DB +{ + +namespace +{ + +std::string getLogPath(const std::string & prefix, size_t number) +{ + std::filesystem::path path(prefix); + path /= std::filesystem::path(std::string{"deduplication_log_"} + std::to_string(number) + ".txt"); + return path; +} + +size_t getLogNumber(const std::string & path_str) +{ + std::filesystem::path path(path_str); + std::string filename = path.stem(); + Strings filename_parts; + boost::split(filename_parts, filename, boost::is_any_of("_")); + + return parse(filename_parts[2]); +} + +} + +MergeTreeDeduplicationLog::MergeTreeDeduplicationLog( + const std::string & logs_dir_, + size_t deduplication_window_, + size_t rotate_interval_) + : logs_dir(logs_dir_) + , deduplication_window(deduplication_window_) + , rotate_interval(rotate_interval_) +{} + +void MergeTreeDeduplicationLog::load() +{ + namespace fs = std::filesystem; + if (!fs::exists(logs_dir)) + fs::create_directories(logs_dir); + + for (const auto & p : fs::directory_iterator(logs_dir)) + { + auto path = p.path(); + auto log_number = getLogNumber(path); + existing_logs[log_description.log_number] = {path, 0}; + } +} + +std::unordered_set MergeTreeDeduplicationLog::loadSingleLog(const std::string & path) +{ + ReadBufferFromFile read_buf(path); + + while (!read_buf.eof()) + { + readIntBinary(record_checksum, read_buf); + } +} + +void MergeTreeDeduplicationLog::rotate() +{ + size_t new_log_number = log_counter++; + auto new_description = getLogDescription(logs_dir, new_log_number, rotate_interval); + existing_logs.emplace(new_log_number, new_description); + current_writer->sync(); + + current_writer = std::make_unique(description.path, WriteMode::Append, description.from_log_index); + +} + +} diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.h b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h new file mode 100644 index 00000000000..140e1c80be7 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h @@ -0,0 +1,57 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +enum class MergeTreeDeduplicationOp : uint8_t +{ + ADD = 1, + DROP = 2, +}; + +struct MergeTreeDeduplicationLogRecord +{ + MergeTreeDeduplicationOp operation; + std::string part_name; + std::string block_id; +}; + +struct MergeTreeDeduplicationLogNameDescription +{ + std::string path; + size_t entries_count; +}; + +class MergeTreeDeduplicationLog +{ +public: + MergeTreeDeduplicationLog( + const std::string & logs_dir_, + size_t deduplication_window_, + size_t rotate_interval_); + + bool addPart(const MergeTreeData::MutableDataPartPtr & part); + void dropPart(const MergeTreeData::MutableDataPartPtr & part); + void dropPartition(const std::string & partition_id); + + void load(); +private: + const std::string logs_dir; + const size_t deduplication_window; + const size_t rotate_interval; + size_t log_counter = 1; + std::map existing_logs; + + std::unordered_set deduplication_set; + + std::unique_ptr current_writer; + size_t entries_written_in_current_file; + + void rotate(); + std::unordered_set loadSingleLog(const std::string & path) const; +}; + +}