diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 72ce05e7aab..71a99aa2a87 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1356,6 +1356,24 @@ String IMergeTreeDataPart::getUniqueId() const return id; } + +String IMergeTreeDataPart::getZeroLevelPartBlockID() const +{ + if (info.level != 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name); + + SipHash hash; + checksums.computeTotalChecksumDataOnly(hash); + union + { + char bytes[16]; + UInt64 words[2]; + } hash_value; + hash.get128(hash_value.bytes); + + return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); +} + bool isCompactPart(const MergeTreeDataPartPtr & data_part) { return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT); @@ -1372,4 +1390,3 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part) } } - diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 03f6564788a..4e531826c98 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -164,6 +164,9 @@ public: bool isEmpty() const { return rows_count == 0; } + /// Compute part block id for zero level part. Otherwise throws an exception. + String getZeroLevelPartBlockID() const; + const MergeTreeData & storage; String name; diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index bb5644567ae..6c9e14f9796 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -35,12 +35,14 @@ void MergeTreeBlockOutputStream::write(const Block & block) if (!part) continue; - storage.renameTempPartAndAdd(part, &storage.increment); + /// Part can be deduplicated, so increment counters and add to part log only if it's really added + if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog())) + { + PartLog::addNewPart(storage.global_context, part, watch.elapsed()); - PartLog::addNewPart(storage.global_context, part, watch.elapsed()); - - /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. - storage.background_executor.triggerTask(); + /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. + storage.background_executor.triggerTask(); + } } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index d61de13b604..d1cd236162a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -71,6 +71,7 @@ namespace ProfileEvents extern const Event RejectedInserts; extern const Event DelayedInserts; extern const Event DelayedInsertsMilliseconds; + extern const Event DuplicatedInsertedBlocks; } namespace CurrentMetrics @@ -2022,7 +2023,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace( } -bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) +bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log) { if (out_transaction && &out_transaction->data != this) throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.", @@ -2031,7 +2032,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem DataPartsVector covered_parts; { auto lock = lockParts(); - if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts)) + if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log)) return false; } if (!covered_parts.empty()) @@ -2044,7 +2045,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem bool MergeTreeData::renameTempPartAndReplace( MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, - std::unique_lock & lock, DataPartsVector * out_covered_parts) + std::unique_lock & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log) { if (out_transaction && &out_transaction->data != this) throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.", @@ -2099,6 +2100,22 @@ bool MergeTreeData::renameTempPartAndReplace( return false; } + /// Deduplication log used only from non-replicated MergeTree. Replicated + /// tables have their own mechanism. We try to deduplicate at such deep + /// level, because only here we know real part name which is required for + /// deduplication. + if (deduplication_log) + { + String block_id = part->getZeroLevelPartBlockID(); + auto res = deduplication_log->addPart(block_id, part_info); + if (!res.second) + { + ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); + LOG_INFO(log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName()); + return false; + } + } + /// All checks are passed. Now we can rename the part on disk. /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts /// @@ -2155,7 +2172,7 @@ bool MergeTreeData::renameTempPartAndReplace( } MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( - MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) + MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log) { if (out_transaction && &out_transaction->data != this) throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.", @@ -2164,7 +2181,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( DataPartsVector covered_parts; { auto lock = lockParts(); - renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts); + renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log); } return covered_parts; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 63d776a838c..1bf66f9bfde 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -54,6 +54,7 @@ struct CurrentlySubmergingEmergingTagger; class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; using ManyExpressionActions = std::vector; +class MergeTreeDeduplicationLog; namespace ErrorCodes { @@ -447,18 +448,18 @@ public: /// active set later with out_transaction->commit()). /// Else, commits the part immediately. /// Returns true if part was added. Returns false if part is covered by bigger part. - bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); + bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr); /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts. /// Returns all parts covered by the added part (in ascending order). /// If out_transaction == nullptr, marks covered parts as Outdated. DataPartsVector renameTempPartAndReplace( - MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); + MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr); /// Low-level version of previous one, doesn't lock mutex bool renameTempPartAndReplace( MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock, - DataPartsVector * out_covered_parts = nullptr); + DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr); /// Remove parts from working set immediately (without wait for background diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp new file mode 100644 index 00000000000..33960e2e1ff --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp @@ -0,0 +1,311 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + +/// Deduplication operation part was dropped or added +enum class MergeTreeDeduplicationOp : uint8_t +{ + ADD = 1, + DROP = 2, +}; + +/// Record for deduplication on disk +struct MergeTreeDeduplicationLogRecord +{ + MergeTreeDeduplicationOp operation; + std::string part_name; + std::string block_id; +}; + +void writeRecord(const MergeTreeDeduplicationLogRecord & record, WriteBuffer & out) +{ + writeIntText(static_cast(record.operation), out); + writeChar('\t', out); + writeString(record.part_name, out); + writeChar('\t', out); + writeString(record.block_id, out); + writeChar('\n', out); + out.next(); +} + +void readRecord(MergeTreeDeduplicationLogRecord & record, ReadBuffer & in) +{ + uint8_t op; + readIntText(op, in); + record.operation = static_cast(op); + assertChar('\t', in); + readString(record.part_name, in); + assertChar('\t', in); + readString(record.block_id, in); + assertChar('\n', in); +} + + +std::string getLogPath(const std::string & prefix, size_t number) +{ + std::filesystem::path path(prefix); + path /= std::filesystem::path(std::string{"deduplication_log_"} + std::to_string(number) + ".txt"); + return path; +} + +size_t getLogNumber(const std::string & path_str) +{ + std::filesystem::path path(path_str); + std::string filename = path.stem(); + Strings filename_parts; + boost::split(filename_parts, filename, boost::is_any_of("_")); + + return parse(filename_parts[2]); +} + +} + +MergeTreeDeduplicationLog::MergeTreeDeduplicationLog( + const std::string & logs_dir_, + size_t deduplication_window_, + const MergeTreeDataFormatVersion & format_version_) + : logs_dir(logs_dir_) + , deduplication_window(deduplication_window_) + , rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter + , format_version(format_version_) + , deduplication_map(deduplication_window) +{ + namespace fs = std::filesystem; + if (deduplication_window != 0 && !fs::exists(logs_dir)) + fs::create_directories(logs_dir); +} + +void MergeTreeDeduplicationLog::load() +{ + namespace fs = std::filesystem; + if (!fs::exists(logs_dir)) + return; + + for (const auto & p : fs::directory_iterator(logs_dir)) + { + const auto & path = p.path(); + auto log_number = getLogNumber(path); + existing_logs[log_number] = {path, 0}; + } + + /// We should know which logs are exist even in case + /// of deduplication_window = 0 + if (!existing_logs.empty()) + current_log_number = existing_logs.rbegin()->first; + + if (deduplication_window != 0) + { + /// Order important, we load history from the begging to the end + for (auto & [log_number, desc] : existing_logs) + { + try + { + desc.entries_count = loadSingleLog(desc.path); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path); + } + } + + /// Start new log, drop previous + rotateAndDropIfNeeded(); + + /// Can happen in case we have unfinished log + if (!current_writer) + current_writer = std::make_unique(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); + } +} + +size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path) +{ + ReadBufferFromFile read_buf(path); + + size_t total_entries = 0; + while (!read_buf.eof()) + { + MergeTreeDeduplicationLogRecord record; + readRecord(record, read_buf); + if (record.operation == MergeTreeDeduplicationOp::DROP) + deduplication_map.erase(record.block_id); + else + deduplication_map.insert(record.block_id, MergeTreePartInfo::fromPartName(record.part_name, format_version)); + total_entries++; + } + return total_entries; +} + +void MergeTreeDeduplicationLog::rotate() +{ + /// We don't deduplicate anything so we don't need any writers + if (deduplication_window == 0) + return; + + current_log_number++; + auto new_path = getLogPath(logs_dir, current_log_number); + MergeTreeDeduplicationLogNameDescription log_description{new_path, 0}; + existing_logs.emplace(current_log_number, log_description); + + if (current_writer) + current_writer->sync(); + + current_writer = std::make_unique(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); +} + +void MergeTreeDeduplicationLog::dropOutdatedLogs() +{ + size_t current_sum = 0; + size_t remove_from_value = 0; + /// Go from end to the beginning + for (auto itr = existing_logs.rbegin(); itr != existing_logs.rend(); ++itr) + { + if (current_sum > deduplication_window) + { + /// We have more logs than required, all older files (including current) can be dropped + remove_from_value = itr->first; + break; + } + + auto & description = itr->second; + current_sum += description.entries_count; + } + + /// If we found some logs to drop + if (remove_from_value != 0) + { + /// Go from the beginning to the end and drop all outdated logs + for (auto itr = existing_logs.begin(); itr != existing_logs.end();) + { + size_t number = itr->first; + std::filesystem::remove(itr->second.path); + itr = existing_logs.erase(itr); + if (remove_from_value == number) + break; + } + } + +} + +void MergeTreeDeduplicationLog::rotateAndDropIfNeeded() +{ + /// If we don't have logs at all or already have enough records in current + if (existing_logs.empty() || existing_logs[current_log_number].entries_count >= rotate_interval) + { + rotate(); + dropOutdatedLogs(); + } +} + +std::pair MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreePartInfo & part_info) +{ + std::lock_guard lock(state_mutex); + + /// We support zero case because user may want to disable deduplication with + /// ALTER MODIFY SETTING query. It's much more simpler to handle zero case + /// here then destroy whole object, check for null pointer from different + /// threads and so on. + if (deduplication_window == 0) + return std::make_pair(part_info, true); + + /// If we already have this block let's deduplicate it + if (deduplication_map.contains(block_id)) + { + auto info = deduplication_map.get(block_id); + return std::make_pair(info, false); + } + + assert(current_writer != nullptr); + + /// Create new record + MergeTreeDeduplicationLogRecord record; + record.operation = MergeTreeDeduplicationOp::ADD; + record.part_name = part_info.getPartName(); + record.block_id = block_id; + /// Write it to disk + writeRecord(record, *current_writer); + /// We have one more record in current log + existing_logs[current_log_number].entries_count++; + /// Add to deduplication map + deduplication_map.insert(record.block_id, part_info); + /// Rotate and drop old logs if needed + rotateAndDropIfNeeded(); + + return std::make_pair(part_info, true); +} + +void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_info) +{ + std::lock_guard lock(state_mutex); + + /// We support zero case because user may want to disable deduplication with + /// ALTER MODIFY SETTING query. It's much more simpler to handle zero case + /// here then destroy whole object, check for null pointer from different + /// threads and so on. + if (deduplication_window == 0) + return; + + assert(current_writer != nullptr); + + for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */) + { + const auto & part_info = itr->value; + /// Part is covered by dropped part, let's remove it from + /// deduplication history + if (drop_part_info.contains(part_info)) + { + /// Create drop record + MergeTreeDeduplicationLogRecord record; + record.operation = MergeTreeDeduplicationOp::DROP; + record.part_name = part_info.getPartName(); + record.block_id = itr->key; + /// Write it to disk + writeRecord(record, *current_writer); + /// We have one more record on disk + existing_logs[current_log_number].entries_count++; + + /// Increment itr before erase, otherwise it will invalidated + ++itr; + /// Remove block_id from in-memory table + deduplication_map.erase(record.block_id); + + /// Rotate and drop old logs if needed + rotateAndDropIfNeeded(); + } + else + { + ++itr; + } + } +} + +void MergeTreeDeduplicationLog::setDeduplicationWindowSize(size_t deduplication_window_) +{ + std::lock_guard lock(state_mutex); + + deduplication_window = deduplication_window_; + rotate_interval = deduplication_window * 2; + + /// If settings was set for the first time with ALTER MODIFY SETTING query + if (deduplication_window != 0 && !std::filesystem::exists(logs_dir)) + std::filesystem::create_directories(logs_dir); + + deduplication_map.setMaxSize(deduplication_window); + rotateAndDropIfNeeded(); + + /// Can happen in case we have unfinished log + if (!current_writer) + current_writer = std::make_unique(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.h b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h new file mode 100644 index 00000000000..281a76050a2 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h @@ -0,0 +1,192 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Description of dedupliction log +struct MergeTreeDeduplicationLogNameDescription +{ + /// Path to log + std::string path; + + /// How many entries we have in log + size_t entries_count; +}; + +/// Simple string-key HashTable with fixed size based on STL containers. +/// Preserves order using linked list and remove elements +/// on overflow in FIFO order. +template +class LimitedOrderedHashMap +{ +private: + struct ListNode + { + std::string key; + V value; + }; + using Queue = std::list; + using IndexMap = std::unordered_map; + + Queue queue; + IndexMap map; + size_t max_size; +public: + using iterator = typename Queue::iterator; + using const_iterator = typename Queue::const_iterator; + using reverse_iterator = typename Queue::reverse_iterator; + using const_reverse_iterator = typename Queue::const_reverse_iterator; + + explicit LimitedOrderedHashMap(size_t max_size_) + : max_size(max_size_) + {} + + bool contains(const std::string & key) const + { + return map.find(key) != map.end(); + } + + V get(const std::string & key) const + { + return map.at(key)->value; + } + + size_t size() const + { + return queue.size(); + } + + void setMaxSize(size_t max_size_) + { + max_size = max_size_; + while (size() > max_size) + { + map.erase(queue.front().key); + queue.pop_front(); + } + } + + bool erase(const std::string & key) + { + auto it = map.find(key); + if (it == map.end()) + return false; + + auto queue_itr = it->second; + map.erase(it); + queue.erase(queue_itr); + + return true; + } + + bool insert(const std::string & key, const V & value) + { + auto it = map.find(key); + if (it != map.end()) + return false; + + if (size() == max_size) + { + map.erase(queue.front().key); + queue.pop_front(); + } + + ListNode elem{key, value}; + auto itr = queue.insert(queue.end(), elem); + map.emplace(itr->key, itr); + return true; + } + + void clear() + { + map.clear(); + queue.clear(); + } + + iterator begin() { return queue.begin(); } + const_iterator begin() const { return queue.cbegin(); } + iterator end() { return queue.end(); } + const_iterator end() const { return queue.cend(); } + + reverse_iterator rbegin() { return queue.rbegin(); } + const_reverse_iterator rbegin() const { return queue.crbegin(); } + reverse_iterator rend() { return queue.rend(); } + const_reverse_iterator rend() const { return queue.crend(); } +}; + +/// Fixed-size log for deduplication in non-replicated MergeTree. +/// Stores records on disk for zero-level parts in human-readable format: +/// operation part_name partition_id_check_sum +/// 1 88_18_18_0 88_10619499460461868496_9553701830997749308 +/// 2 77_14_14_0 77_15147918179036854170_6725063583757244937 +/// 2 77_15_15_0 77_14977227047908934259_8047656067364802772 +/// 1 77_20_20_0 77_15147918179036854170_6725063583757244937 +/// Also stores them in memory in hash table with limited size. +class MergeTreeDeduplicationLog +{ +public: + MergeTreeDeduplicationLog( + const std::string & logs_dir_, + size_t deduplication_window_, + const MergeTreeDataFormatVersion & format_version_); + + /// Add part into in-memory hash table and to disk + /// Return true and part info if insertion was successful. + /// Otherwise, in case of duplicate, return false and previous part name with same hash (useful for logging) + std::pair addPart(const std::string & block_id, const MergeTreePartInfo & part); + + /// Remove all covered parts from in memory table and add DROP records to the disk + void dropPart(const MergeTreePartInfo & drop_part_info); + + /// Load history from disk. Ignores broken logs. + void load(); + + void setDeduplicationWindowSize(size_t deduplication_window_); +private: + const std::string logs_dir; + /// Size of deduplication window + size_t deduplication_window; + + /// How often we create new logs. Not very important, + /// default value equals deduplication_window * 2 + size_t rotate_interval; + const MergeTreeDataFormatVersion format_version; + + /// Current log number. Always growing number. + size_t current_log_number = 0; + + /// All existing logs in order of their numbers + std::map existing_logs; + + /// In memory hash-table + LimitedOrderedHashMap deduplication_map; + + /// Writer to the current log file + std::unique_ptr current_writer; + + /// Overall mutex because we can have a lot of cocurrent inserts + std::mutex state_mutex; + + /// Start new log + void rotate(); + + /// Remove all old logs with non-needed records for deduplication_window + void dropOutdatedLogs(); + + /// Execute both previous methods if needed + void rotateAndDropIfNeeded(); + + /// Load single log from disk. In case of corruption throws exceptions + size_t loadSingleLog(const std::string & path); +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 7a1ef8aeed6..f422f00f4dc 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -2,6 +2,7 @@ #include #include +#include namespace Poco::Util @@ -54,6 +55,7 @@ struct Settings; M(UInt64, write_ahead_log_bytes_to_fsync, 100ULL * 1024 * 1024, "Amount of bytes, accumulated in WAL to do fsync.", 0) \ M(UInt64, write_ahead_log_interval_ms_to_fsync, 100, "Interval in milliseconds after which fsync for WAL is being done.", 0) \ M(Bool, in_memory_parts_insert_sync, false, "If true insert of part with in-memory format will wait for fsync of WAL", 0) \ + M(UInt64, non_replicated_deduplication_window, 0, "How many last blocks of hashes should be kept on disk (0 - disabled).", 0) \ \ /** Inserts settings. */ \ M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 529e3d2ab49..d22f94fefe3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -155,18 +155,9 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (deduplicate) { - SipHash hash; - part->checksums.computeTotalChecksumDataOnly(hash); - union - { - char bytes[16]; - UInt64 words[2]; - } hash_value; - hash.get128(hash_value.bytes); - /// We add the hash from the data and partition identifier to deduplication ID. /// That is, do not insert the same data to the same partition twice. - block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); + block_id = part->getZeroLevelPartBlockID(); LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows()); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 10790057ac9..eeb8df4d329 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -93,6 +93,8 @@ StorageMergeTree::StorageMergeTree( increment.set(getMaxBlockNumber()); loadMutations(); + + loadDeduplicationLog(); } @@ -265,6 +267,7 @@ void StorageMergeTree::alter( TableLockHolder & table_lock_holder) { auto table_id = getStorageID(); + auto old_storage_settings = getSettings(); StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); @@ -299,6 +302,21 @@ void StorageMergeTree::alter( if (!maybe_mutation_commands.empty()) waitForMutation(mutation_version, mutation_file_name); } + + { + /// Some additional changes in settings + auto new_storage_settings = getSettings(); + + if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window) + { + /// We cannot place this check into settings sanityCheck because it depends on format_version. + /// sanityCheck must work event without storage. + if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS); + + deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window); + } + } } @@ -614,6 +632,16 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) return CancellationCode::CancelSent; } +void StorageMergeTree::loadDeduplicationLog() +{ + auto settings = getSettings(); + if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS); + + std::string path = getDataPaths()[0] + "/deduplication_logs"; + deduplication_log = std::make_unique(path, settings->non_replicated_deduplication_window, format_version); + deduplication_log->load(); +} void StorageMergeTree::loadMutations() { @@ -1209,6 +1237,12 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool } } + if (deduplication_log) + { + for (const auto & part : parts_to_remove) + deduplication_log->dropPart(part->info); + } + if (detach) LOG_INFO(log, "Detached {} parts.", parts_to_remove.size()); else diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 246ce151a02..1ae21608190 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -12,6 +12,8 @@ #include #include #include +#include + #include #include #include @@ -93,6 +95,8 @@ public: CheckResults checkData(const ASTPtr & query, const Context & context) override; std::optional getDataProcessingJob() override; + + MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } private: /// Mutex and condvar for synchronous mutations wait @@ -105,6 +109,8 @@ private: BackgroundJobsExecutor background_executor; BackgroundMovesExecutor background_moves_executor; + std::unique_ptr deduplication_log; + /// For block numbers. SimpleIncrement increment; @@ -128,6 +134,10 @@ private: void loadMutations(); + /// Load and initialize deduplication logs. Even if deduplication setting + /// equals zero creates object with deduplication window equals zero. + void loadDeduplicationLog(); + /** Determines what parts should be merged and merges it. * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * Returns true if merge is finished successfully. diff --git a/src/Storages/ya.make b/src/Storages/ya.make index e3e1807c566..2afdbe8c749 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -57,6 +57,7 @@ SRCS( MergeTree/MergeTreeDataPartWriterWide.cpp MergeTree/MergeTreeDataSelectExecutor.cpp MergeTree/MergeTreeDataWriter.cpp + MergeTree/MergeTreeDeduplicationLog.cpp MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp MergeTree/MergeTreeIndexBloomFilter.cpp MergeTree/MergeTreeIndexConditionBloomFilter.cpp diff --git a/tests/queries/0_stateless/01781_merge_tree_deduplication.reference b/tests/queries/0_stateless/01781_merge_tree_deduplication.reference new file mode 100644 index 00000000000..cb5a3f1ff52 --- /dev/null +++ b/tests/queries/0_stateless/01781_merge_tree_deduplication.reference @@ -0,0 +1,85 @@ +1 1 +1 1 +=============== +1 1 +1 1 +2 2 +3 3 +4 4 +=============== +1 1 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +=============== +1 1 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +10 10 +11 11 +12 12 +=============== +10 10 +12 12 +=============== +1 1 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +8 8 +9 9 +11 11 +12 12 +=============== +88 11 11 +77 11 11 +77 12 12 +=============== +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 33 +1 1 33 +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 33 +1 1 33 +1 1 33 +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 44 +2 2 44 +3 3 44 +4 4 44 +=============== +1 1 +1 1 +=============== +1 1 +1 1 +1 1 +2 2 +3 3 +4 4 diff --git a/tests/queries/0_stateless/01781_merge_tree_deduplication.sql b/tests/queries/0_stateless/01781_merge_tree_deduplication.sql new file mode 100644 index 00000000000..236f7b35b80 --- /dev/null +++ b/tests/queries/0_stateless/01781_merge_tree_deduplication.sql @@ -0,0 +1,187 @@ +DROP TABLE IF EXISTS merge_tree_deduplication; + +CREATE TABLE merge_tree_deduplication +( + key UInt64, + value String, + part UInt8 DEFAULT 77 +) +ENGINE=MergeTree() +ORDER BY key +PARTITION BY part +SETTINGS non_replicated_deduplication_window=3; + +SYSTEM STOP MERGES merge_tree_deduplication; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1'); + +SELECT key, value FROM merge_tree_deduplication; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1'); + +SELECT key, value FROM merge_tree_deduplication; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (2, '2'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (3, '3'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (4, '4'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1'); + +SELECT key, value FROM merge_tree_deduplication ORDER BY key; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (6, '6'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (7, '7'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5'); + +SELECT key, value FROM merge_tree_deduplication ORDER BY key; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (8, '8'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (9, '9'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); + +INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10'); +INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); +INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); + +SELECT key, value FROM merge_tree_deduplication ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_deduplication DROP PART '77_9_9_0'; -- some old part + +INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10'); + +SELECT key, value FROM merge_tree_deduplication WHERE key = 10; + +ALTER TABLE merge_tree_deduplication DROP PART '77_13_13_0'; -- fresh part + +INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); + +SELECT key, value FROM merge_tree_deduplication WHERE key = 12; + +DETACH TABLE merge_tree_deduplication; +ATTACH TABLE merge_tree_deduplication; + +OPTIMIZE TABLE merge_tree_deduplication FINAL; + +INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- deduplicated +INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- deduplicated + +SELECT '==============='; + +SELECT key, value FROM merge_tree_deduplication ORDER BY key; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88); + +ALTER TABLE merge_tree_deduplication DROP PARTITION 77; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88); --deduplicated + +INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- not deduplicated +INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- not deduplicated + +SELECT part, key, value FROM merge_tree_deduplication ORDER BY key; + +-- Alters.... + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 2; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 0; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +DETACH TABLE merge_tree_deduplication; +ATTACH TABLE merge_tree_deduplication; + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 3; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44); + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (4, '4', 44); + +DETACH TABLE merge_tree_deduplication; +ATTACH TABLE merge_tree_deduplication; + +SELECT * FROM merge_tree_deduplication WHERE part = 44 ORDER BY key; + +DROP TABLE IF EXISTS merge_tree_deduplication; + +SELECT '==============='; + +DROP TABLE IF EXISTS merge_tree_no_deduplication; + +CREATE TABLE merge_tree_no_deduplication +( + key UInt64, + value String +) +ENGINE=MergeTree() +ORDER BY key; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); + +SELECT * FROM merge_tree_no_deduplication ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_no_deduplication MODIFY SETTING non_replicated_deduplication_window = 3; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (2, '2'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (3, '3'); + +DETACH TABLE merge_tree_no_deduplication; +ATTACH TABLE merge_tree_no_deduplication; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (4, '4'); + +SELECT * FROM merge_tree_no_deduplication ORDER BY key; + +DROP TABLE IF EXISTS merge_tree_no_deduplication; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 6eb828cea21..fc812cbc8c8 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -148,6 +148,7 @@ "00626_replace_partition_from_table", "00152_insert_different_granularity", "00054_merge_tree_partitions", + "01781_merge_tree_deduplication", /// Old syntax is not allowed "01062_alter_on_mutataion_zookeeper", "00925_zookeeper_empty_replicated_merge_tree_optimize_final",