From b08056fa8c0f84670bab96b5643dd36850db0d8a Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 2 Sep 2020 11:18:50 +0300 Subject: [PATCH] Better selection of Merges with TTL --- src/Storages/MergeTree/MergeList.cpp | 2 ++ src/Storages/MergeTree/MergeList.h | 2 ++ .../MergeTree/MergeTreeDataMergerMutator.cpp | 32 +++++++++++++++---- .../MergeTree/MergeTreeDataMergerMutator.h | 7 ++-- src/Storages/MergeTree/MergeTreeSettings.h | 4 ++- src/Storages/MergeTree/MergeType.cpp | 27 ++++++++++++++++ src/Storages/MergeTree/MergeType.h | 17 ++++++++++ .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 13 ++++++++ .../MergeTree/ReplicatedMergeTreeLogEntry.h | 2 ++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 11 +++++-- .../MergeTree/ReplicatedMergeTreeQueue.h | 9 +++++- src/Storages/StorageMergeTree.cpp | 10 ++++-- src/Storages/StorageReplicatedMergeTree.cpp | 31 +++++++++++------- src/Storages/StorageReplicatedMergeTree.h | 3 +- src/Storages/System/StorageSystemMerges.cpp | 2 ++ 15 files changed, 145 insertions(+), 27 deletions(-) create mode 100644 src/Storages/MergeTree/MergeType.cpp create mode 100644 src/Storages/MergeTree/MergeType.h diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index e9d955f5395..5e7b7046c85 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -21,6 +21,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str , result_data_version{future_part.part_info.getDataVersion()} , num_parts{future_part.parts.size()} , thread_id{getThreadId()} + , merge_type{toString(future_part.merge_type)} { for (const auto & source_part : future_part.parts) { @@ -70,6 +71,7 @@ MergeInfo MergeListElement::getInfo() const res.columns_written = columns_written.load(std::memory_order_relaxed); res.memory_usage = memory_tracker.get(); res.thread_id = thread_id; + res.merge_type = merge_type; for (const auto & source_part_name : source_part_names) res.source_part_names.emplace_back(source_part_name); diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 4ee8a75a868..e6ae0407ec0 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -45,6 +45,7 @@ struct MergeInfo UInt64 columns_written; UInt64 memory_usage; UInt64 thread_id; + std::string merge_type; }; struct FutureMergedMutatedPart; @@ -88,6 +89,7 @@ struct MergeListElement : boost::noncopyable UInt64 thread_id; + const std::string merge_type; MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 673ad02bfb6..a0ab7866402 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -158,15 +158,15 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(MergeType merge_type) { size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); - return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread + return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, merge_type); /// 1 is current thread } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type) { if (pool_used > pool_size) throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); @@ -178,14 +178,21 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz /// One entry is probably the entry where this function is executed. /// This will protect from bad settings. + + size_t lowering_setting; + if (merge_type == MergeType::TTL_DELETE) + lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl; + else + lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge; + UInt64 max_size = 0; - if (pool_used <= 1 || free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge) + if (pool_used <= 1 || free_entries >= lowering_setting) max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool; else max_size = interpolateExponential( data_settings->max_bytes_to_merge_at_min_space_in_pool, data_settings->max_bytes_to_merge_at_max_space_in_pool, - static_cast(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge); + static_cast(free_entries) / lowering_setting); return std::min(max_size, static_cast(data.getStoragePolicy()->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); } @@ -213,6 +220,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( bool aggressive, size_t max_total_size_to_merge, const AllowedMergingPredicate & can_merge_callback, + size_t max_total_size_to_merge_with_ttl, String * out_disable_reason) { MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); @@ -284,7 +292,9 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( current_time, data_settings->merge_with_ttl_timeout, data_settings->ttl_only_drop_parts); - parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge); + + parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge_with_ttl); + future_part.merge_type = MergeType::TTL_DELETE; } if (parts_to_merge.empty()) @@ -306,6 +316,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; return false; } + future_part.merge_type = MergeType::NORMAL; } MergeTreeData::DataPartsVector parts; @@ -385,6 +396,12 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); future_part.assign(std::move(parts)); + + if (final) + future_part.merge_type = MergeType::FINAL; + else + future_part.merge_type = MergeType::NORMAL; + available_disk_space -= required_disk_space; return true; } @@ -634,6 +651,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor new_data_part->partition.assign(future_part.getPartition()); new_data_part->is_temp = true; + if (future_part.merge_type == MergeType::TTL_DELETE && ttl_merges_blocker.isCancelled()) + throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED); + bool need_remove_expired_values = false; for (const auto & part : parts) new_data_part->ttl_infos.update(part->ttl_infos); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index d5798fe3582..086a2a9cae2 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -22,6 +23,7 @@ struct FutureMergedMutatedPart MergeTreeDataPartType type; MergeTreePartInfo part_info; MergeTreeData::DataPartsVector parts; + MergeType merge_type = MergeType::NORMAL; const MergeTreePartition & getPartition() const { return parts.front()->partition; } @@ -57,12 +59,12 @@ public: /** Get maximum total size of parts to do merge, at current moment of time. * It depends on number of free threads in background_pool and amount of free space in disk. */ - UInt64 getMaxSourcePartsSizeForMerge(); + UInt64 getMaxSourcePartsSizeForMerge(MergeType merge_type); /** For explicitly passed size of pool and number of used tasks. * This method could be used to calculate threshold depending on number of tasks in replication queue. */ - UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used); + UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type); /** Get maximum total size of parts to do mutation, at current moment of time. * It depends only on amount of free space in disk. @@ -81,6 +83,7 @@ public: bool aggressive, size_t max_total_size_to_merge, const AllowedMergingPredicate & can_merge, + size_t max_total_size_to_merge_with_ttl, String * out_disable_reason = nullptr); /** Select all the parts in the specified partition for merge, if possible. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 085c441aa90..e5707ff837c 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -33,8 +33,10 @@ struct Settings; M(UInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).", 0) \ M(UInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \ M(UInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \ + M(UInt64, max_replicated_merges_with_ttl_in_queue, 1, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \ M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \ M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \ + M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl, 14, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \ M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \ M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \ M(Seconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \ @@ -83,7 +85,7 @@ struct Settings; M(UInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).", 0) \ M(UInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).", 0) \ M(UInt64, min_index_granularity_bytes, 1024, "Minimum amount of bytes in single granule.", 1024) \ - M(Int64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.", 0) \ + M(Int64, merge_with_ttl_timeout, 0, "Minimal time in seconds, when merge with TTL can be repeated.", 0) \ M(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \ M(Bool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)", 0) \ M(Bool, enable_mixed_granularity_parts, 1, "Enable parts with adaptive and non adaptive granularity", 0) \ diff --git a/src/Storages/MergeTree/MergeType.cpp b/src/Storages/MergeTree/MergeType.cpp new file mode 100644 index 00000000000..b58a0de4093 --- /dev/null +++ b/src/Storages/MergeTree/MergeType.cpp @@ -0,0 +1,27 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + +String toString(MergeType merge_type) +{ + switch (merge_type) + { + case MergeType::NORMAL: + return "NORMAL"; + case MergeType::FINAL: + return "FINAL"; + case MergeType::TTL_DELETE: + return "TTL_DELETE"; + } + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast(merge_type)); +} + +} diff --git a/src/Storages/MergeTree/MergeType.h b/src/Storages/MergeTree/MergeType.h new file mode 100644 index 00000000000..5d9abaa61b3 --- /dev/null +++ b/src/Storages/MergeTree/MergeType.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +enum class MergeType +{ + NORMAL, + FINAL, + TTL_DELETE, +}; + +String toString(MergeType merge_type); + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index af6d980ad98..de8dd7f6097 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -36,6 +36,8 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const out << s << '\n'; out << "into\n" << new_part_name; out << "\ndeduplicate: " << deduplicate; + if (merge_type != MergeType::NORMAL) + out <<"\nmerge_type: " << static_cast(merge_type); break; case DROP_RANGE: @@ -149,7 +151,18 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) } in >> new_part_name; if (format_version >= 4) + { in >> "\ndeduplicate: " >> deduplicate; + in >> "\n"; + if (in.eof()) + trailing_newline_found = true; + else if (checkString("merge_type: ", in)) + { + UInt64 value; + in >> value; + merge_type = static_cast(value); + } + } } else if (type_str == "drop" || type_str == "detach") { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index ae5fad0b83c..bea796ce015 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -79,6 +80,7 @@ struct ReplicatedMergeTreeLogEntryData Strings source_parts; bool deduplicate = false; /// Do deduplicate on merge + MergeType merge_type = MergeType::NORMAL; String column_name; String index_name; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 8e2c3752212..c9b366a9ec8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1061,7 +1061,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( return false; } - UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() + UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type) : merger_mutator.getMaxSourcePartSizeForMutation(); /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, @@ -1312,21 +1312,26 @@ bool ReplicatedMergeTreeQueue::processEntry( } -std::pair ReplicatedMergeTreeQueue::countMergesAndPartMutations() const +ReplicatedMergeTreeQueue::OperationsInQueue ReplicatedMergeTreeQueue::countMergesAndPartMutations() const { std::lock_guard lock(state_mutex); size_t count_merges = 0; size_t count_mutations = 0; + size_t count_merges_with_ttl = 0; for (const auto & entry : queue) { if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS) + { ++count_merges; + if (entry->merge_type == MergeType::TTL_DELETE) + ++count_merges_with_ttl; + } else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) ++count_mutations; } - return std::make_pair(count_merges, count_mutations); + return OperationsInQueue{count_merges, count_mutations, count_merges_with_ttl}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 76f84da1ae8..c724701f1ff 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -46,6 +46,13 @@ private: } }; + struct OperationsInQueue + { + size_t merges = 0; + size_t mutations = 0; + size_t merges_with_ttl = 0; + }; + /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated. using InsertsByTime = std::set; @@ -325,7 +332,7 @@ public: bool processEntry(std::function get_zookeeper, LogEntryPtr & entry, const std::function func); /// Count the number of merges and mutations of single parts in the queue. - std::pair countMergesAndPartMutations() const; + OperationsInQueue countMergesAndPartMutations() const; /// Count the total number of active mutations. size_t countMutations() const; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 7e4318a32f6..05f2f5254f0 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -650,9 +650,14 @@ bool StorageMergeTree::merge( if (partition_id.empty()) { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::NORMAL); + UInt64 max_source_parts_size_with_ttl = 0; + + if (!aggressive) + max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::TTL_DELETE); + if (max_source_parts_size > 0) - selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); + selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, max_source_parts_size_with_ttl, out_disable_reason); else if (out_disable_reason) *out_disable_reason = "Current value of max_source_parts_size is zero"; } @@ -724,6 +729,7 @@ bool StorageMergeTree::merge( try { + std::cerr << "FUTURE PART MERGE TYPE:" << toString(future_part.merge_type) << std::endl; new_part = merger_mutator.mergePartsToTemporaryPart( future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), merging_tagger->reserved_space, deduplicate); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6058632d220..1c880c8c790 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2514,31 +2514,38 @@ void StorageReplicatedMergeTree::mergeSelectingTask() /// and in the same time, many small parts could be created and won't be merged. auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); - size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second; + size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations; if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})" " is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.", - merges_and_mutations_queued.first, - merges_and_mutations_queued.second, + merges_and_mutations_queued.merges, + merges_and_mutations_queued.mutations, storage_settings_ptr->max_replicated_merges_in_queue); } else { UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( - storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum); + storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::NORMAL); + + UInt64 max_source_parts_size_for_merge_with_ttl = 0; + if (merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue) + max_source_parts_size_for_merge_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge( + storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::TTL_DELETE); + UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); FutureMergedMutatedPart future_merged_part; if (max_source_parts_size_for_merge > 0 && - merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, nullptr)) + merger_mutator.selectPartsToMerge(future_merged_part, false, + max_source_parts_size_for_merge, merge_pred, max_source_parts_size_for_merge_with_ttl, nullptr)) { create_result = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, - future_merged_part.name, future_merged_part.type, deduplicate, nullptr, merge_pred.getVersion()); + future_merged_part.name, future_merged_part.type, deduplicate, nullptr, merge_pred.getVersion(), future_merged_part.merge_type); } /// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 - && merges_and_mutations_queued.second < storage_settings_ptr->max_replicated_mutations_in_queue) + && merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue) { /// Choose a part to mutate. DataPartsVector data_parts = getDataPartsVector(); @@ -2617,7 +2624,8 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c const MergeTreeDataPartType & merged_part_type, bool deduplicate, ReplicatedMergeTreeLogEntryData * out_log_entry, - int32_t log_version) + int32_t log_version, + MergeType merge_type) { std::vector> exists_futures; exists_futures.reserve(parts.size()); @@ -2649,6 +2657,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c entry.source_replica = replica_name; entry.new_part_name = merged_name; entry.new_part_type = merged_part_type; + entry.merge_type = merge_type; entry.deduplicate = deduplicate; entry.create_time = time(nullptr); @@ -3584,7 +3593,7 @@ bool StorageReplicatedMergeTree::optimize( CreateMergeEntryResult create_result = createLogEntryToMergeParts( zookeeper, future_merged_part.parts, future_merged_part.name, future_merged_part.type, deduplicate, - &merge_entry, can_merge.getVersion()); + &merge_entry, can_merge.getVersion(), future_merged_part.merge_type); if (create_result == CreateMergeEntryResult::MissingPart) return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing"); @@ -3614,7 +3623,7 @@ bool StorageReplicatedMergeTree::optimize( if (!partition) { selected = merger_mutator.selectPartsToMerge( - future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); + future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, 0, &disable_reason); } else { @@ -3639,7 +3648,7 @@ bool StorageReplicatedMergeTree::optimize( CreateMergeEntryResult create_result = createLogEntryToMergeParts( zookeeper, future_merged_part.parts, future_merged_part.name, future_merged_part.type, deduplicate, - &merge_entry, can_merge.getVersion()); + &merge_entry, can_merge.getVersion(), future_merged_part.merge_type); if (create_result == CreateMergeEntryResult::MissingPart) return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing"); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index e9395f20f3f..2bc9265331d 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -450,7 +450,8 @@ private: const MergeTreeDataPartType & merged_part_type, bool deduplicate, ReplicatedMergeTreeLogEntryData * out_log_entry, - int32_t log_version); + int32_t log_version, + MergeType merge_type); CreateMergeEntryResult createLogEntryToMutatePart( const IMergeTreeDataPart & part, diff --git a/src/Storages/System/StorageSystemMerges.cpp b/src/Storages/System/StorageSystemMerges.cpp index 39d22bd00ca..b3bd8f77a89 100644 --- a/src/Storages/System/StorageSystemMerges.cpp +++ b/src/Storages/System/StorageSystemMerges.cpp @@ -30,6 +30,7 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes() {"columns_written", std::make_shared()}, {"memory_usage", std::make_shared()}, {"thread_id", std::make_shared()}, + {"merge_type", std::make_shared()}, }; } @@ -65,6 +66,7 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context & res_columns[i++]->insert(merge.columns_written); res_columns[i++]->insert(merge.memory_usage); res_columns[i++]->insert(merge.thread_id); + res_columns[i++]->insert(merge.merge_type); } }