diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp index 9cbffe977c4..0022a40eca3 100644 --- a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp @@ -80,13 +80,17 @@ MergeTreeDeduplicationLog::MergeTreeDeduplicationLog( , rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter , format_version(format_version_) , deduplication_map(deduplication_window) -{} +{ + namespace fs = std::filesystem; + if (deduplication_window != 0 && !fs::exists(logs_dir)) + fs::create_directories(logs_dir); +} void MergeTreeDeduplicationLog::load() { namespace fs = std::filesystem; if (!fs::exists(logs_dir)) - fs::create_directories(logs_dir); + return; for (const auto & p : fs::directory_iterator(logs_dir)) { @@ -95,26 +99,33 @@ void MergeTreeDeduplicationLog::load() existing_logs[log_number] = {path, 0}; } - /// Order important, we load history from the begging to the end - for (auto & [log_number, desc] : existing_logs) + /// We should know which logs are exist even in case + /// of deduplication_window = 0 + if (!existing_logs.empty()) + current_log_number = existing_logs.rbegin()->first; + + if (deduplication_window != 0) { - try + /// Order important, we load history from the begging to the end + for (auto & [log_number, desc] : existing_logs) { - desc.entries_count = loadSingleLog(desc.path); - current_log_number = log_number; - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path); + try + { + desc.entries_count = loadSingleLog(desc.path); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path); + } } + + /// Start new log, drop previous + rotateAndDropIfNeeded(); + + /// Can happen in case we have unfinished log + if (!current_writer) + current_writer = std::make_unique(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); } - - /// Start new log, drop previous - rotateAndDropIfNeeded(); - - /// Can happen in case we have unfinished log - if (!current_writer) - current_writer = std::make_unique(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); } size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path) @@ -137,6 +148,10 @@ size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path) void MergeTreeDeduplicationLog::rotate() { + /// We don't deduplicate anything so we don't need any writers + if (deduplication_window == 0) + return; + current_log_number++; auto new_path = getLogPath(logs_dir, current_log_number); MergeTreeDeduplicationLogNameDescription log_description{new_path, 0}; @@ -169,7 +184,7 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs() /// If we found some logs to drop if (remove_from_value != 0) { - /// Go from beginning to the end and drop all outdated logs + /// Go from the beginning to the end and drop all outdated logs for (auto itr = existing_logs.begin(); itr != existing_logs.end();) { size_t number = itr->first; @@ -196,6 +211,13 @@ std::pair MergeTreeDeduplicationLog::addPart(const std: { std::lock_guard lock(state_mutex); + /// We support zero case because user may want to disable deduplication with + /// ALTER MODIFY SETTING query. It's much more simplier to handle zero case + /// here then destroy whole object, check for null pointer from different + /// threads and so on. + if (deduplication_window == 0) + return std::make_pair(part_info, true); + /// If we already have this block let's deduplicate it if (deduplication_map.contains(block_id)) { @@ -226,6 +248,13 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf { std::lock_guard lock(state_mutex); + /// We support zero case because user may want to disable deduplication with + /// ALTER MODIFY SETTING query. It's much more simplier to handle zero case + /// here then destroy whole object, check for null pointer from different + /// threads and so on. + if (deduplication_window == 0) + return; + assert(current_writer != nullptr); for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */) @@ -260,4 +289,23 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf } } +void MergeTreeDeduplicationLog::setDeduplicationWindowSize(size_t deduplication_window_) +{ + std::lock_guard lock(state_mutex); + + deduplication_window = deduplication_window_; + rotate_interval = deduplication_window * 2; + + /// If settings was set for the first time with ALTER MODIFY SETTING query + if (deduplication_window != 0 && !std::filesystem::exists(logs_dir)) + std::filesystem::create_directories(logs_dir); + + deduplication_map.setMaxSize(deduplication_window); + rotateAndDropIfNeeded(); + + /// Can happen in case we have unfinished log + if (!current_writer) + current_writer = std::make_unique(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); +} + } diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.h b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h index 643b2ef9fad..281a76050a2 100644 --- a/src/Storages/MergeTree/MergeTreeDeduplicationLog.h +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.h @@ -39,7 +39,7 @@ private: Queue queue; IndexMap map; - const size_t max_size; + size_t max_size; public: using iterator = typename Queue::iterator; using const_iterator = typename Queue::const_iterator; @@ -65,6 +65,16 @@ public: return queue.size(); } + void setMaxSize(size_t max_size_) + { + max_size = max_size_; + while (size() > max_size) + { + map.erase(queue.front().key); + queue.pop_front(); + } + } + bool erase(const std::string & key) { auto it = map.find(key); @@ -139,14 +149,16 @@ public: /// Load history from disk. Ignores broken logs. void load(); + + void setDeduplicationWindowSize(size_t deduplication_window_); private: const std::string logs_dir; /// Size of deduplication window - const size_t deduplication_window; + size_t deduplication_window; /// How often we create new logs. Not very important, /// default value equals deduplication_window * 2 - const size_t rotate_interval; + size_t rotate_interval; const MergeTreeDataFormatVersion format_version; /// Current log number. Always growing number. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 06d909eb912..f422f00f4dc 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -2,6 +2,7 @@ #include #include +#include namespace Poco::Util diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 5f8032d7749..eeb8df4d329 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -94,16 +94,7 @@ StorageMergeTree::StorageMergeTree( loadMutations(); - auto settings = getSettings(); - if (settings->non_replicated_deduplication_window != 0) - { - if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) - throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS); - - std::string path = getDataPaths()[0] + "/deduplication_logs"; - deduplication_log = std::make_unique(path, settings->non_replicated_deduplication_window, format_version); - deduplication_log->load(); - } + loadDeduplicationLog(); } @@ -276,6 +267,7 @@ void StorageMergeTree::alter( TableLockHolder & table_lock_holder) { auto table_id = getStorageID(); + auto old_storage_settings = getSettings(); StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); @@ -310,6 +302,21 @@ void StorageMergeTree::alter( if (!maybe_mutation_commands.empty()) waitForMutation(mutation_version, mutation_file_name); } + + { + /// Some additional changes in settings + auto new_storage_settings = getSettings(); + + if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window) + { + /// We cannot place this check into settings sanityCheck because it depends on format_version. + /// sanityCheck must work event without storage. + if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS); + + deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window); + } + } } @@ -625,6 +632,16 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) return CancellationCode::CancelSent; } +void StorageMergeTree::loadDeduplicationLog() +{ + auto settings = getSettings(); + if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS); + + std::string path = getDataPaths()[0] + "/deduplication_logs"; + deduplication_log = std::make_unique(path, settings->non_replicated_deduplication_window, format_version); + deduplication_log->load(); +} void StorageMergeTree::loadMutations() { diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 3ac7b8a0270..1ae21608190 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -134,6 +134,10 @@ private: void loadMutations(); + /// Load and initialize deduplication logs. Even if deduplication setting + /// equals zero creates object with deduplication window equals zero. + void loadDeduplicationLog(); + /** Determines what parts should be merged and merges it. * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * Returns true if merge is finished successfully. diff --git a/tests/queries/0_stateless/01781_merge_tree_deduplication.reference b/tests/queries/0_stateless/01781_merge_tree_deduplication.reference index 402a8919da5..cb5a3f1ff52 100644 --- a/tests/queries/0_stateless/01781_merge_tree_deduplication.reference +++ b/tests/queries/0_stateless/01781_merge_tree_deduplication.reference @@ -48,3 +48,38 @@ 88 11 11 77 11 11 77 12 12 +=============== +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 33 +1 1 33 +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 33 +1 1 33 +1 1 33 +1 1 33 +1 1 33 +2 2 33 +3 3 33 +=============== +1 1 44 +2 2 44 +3 3 44 +4 4 44 +=============== +1 1 +1 1 +=============== +1 1 +1 1 +1 1 +2 2 +3 3 +4 4 diff --git a/tests/queries/0_stateless/01781_merge_tree_deduplication.sql b/tests/queries/0_stateless/01781_merge_tree_deduplication.sql index 7e4b6f7db2b..236f7b35b80 100644 --- a/tests/queries/0_stateless/01781_merge_tree_deduplication.sql +++ b/tests/queries/0_stateless/01781_merge_tree_deduplication.sql @@ -102,4 +102,86 @@ INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- not dedu SELECT part, key, value FROM merge_tree_deduplication ORDER BY key; +-- Alters.... + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 2; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 0; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +DETACH TABLE merge_tree_deduplication; +ATTACH TABLE merge_tree_deduplication; + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 3; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33); + +SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key; + +SELECT '==============='; + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 44); +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44); + +INSERT INTO merge_tree_deduplication (key, value, part) VALUES (4, '4', 44); + +DETACH TABLE merge_tree_deduplication; +ATTACH TABLE merge_tree_deduplication; + +SELECT * FROM merge_tree_deduplication WHERE part = 44 ORDER BY key; + DROP TABLE IF EXISTS merge_tree_deduplication; + +SELECT '==============='; + +DROP TABLE IF EXISTS merge_tree_no_deduplication; + +CREATE TABLE merge_tree_no_deduplication +( + key UInt64, + value String +) +ENGINE=MergeTree() +ORDER BY key; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); + +SELECT * FROM merge_tree_no_deduplication ORDER BY key; + +SELECT '==============='; + +ALTER TABLE merge_tree_no_deduplication MODIFY SETTING non_replicated_deduplication_window = 3; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (2, '2'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (3, '3'); + +DETACH TABLE merge_tree_no_deduplication; +ATTACH TABLE merge_tree_no_deduplication; + +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1'); +INSERT INTO merge_tree_no_deduplication (key, value) VALUES (4, '4'); + +SELECT * FROM merge_tree_no_deduplication ORDER BY key; + +DROP TABLE IF EXISTS merge_tree_no_deduplication;