From a65f23a2be2906aa31a61db727fa244183987c72 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 8 Nov 2024 13:18:49 +0100 Subject: [PATCH 1/3] Rename method in MergeSelector interface --- .../MergeTree/MergeSelectors/AllMergeSelector.cpp | 2 +- src/Storages/MergeTree/MergeSelectors/AllMergeSelector.h | 2 +- src/Storages/MergeTree/MergeSelectors/MergeSelector.h | 9 ++++++++- .../MergeTree/MergeSelectors/SimpleMergeSelector.cpp | 2 +- .../MergeTree/MergeSelectors/SimpleMergeSelector.h | 2 +- .../MergeTree/MergeSelectors/TTLMergeSelector.cpp | 2 +- src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.h | 2 +- .../MergeTree/MergeSelectors/TrivialMergeSelector.cpp | 2 +- .../MergeTree/MergeSelectors/TrivialMergeSelector.h | 2 +- src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 8 ++++---- src/Storages/examples/merge_selector.cpp | 2 +- src/Storages/examples/merge_selector2.cpp | 2 +- 12 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.cpp index 5571846f1e6..75717ab0799 100644 --- a/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.cpp @@ -12,7 +12,7 @@ void registerAllMergeSelector(MergeSelectorFactory & factory) }); } -AllMergeSelector::PartsRange AllMergeSelector::select( +AllMergeSelector::PartsRange AllMergeSelector::selectBest( const PartsRanges & parts_ranges, size_t /*max_total_size_to_merge*/) { diff --git a/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.h index 80698c78c5b..b557dc8571c 100644 --- a/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/AllMergeSelector.h @@ -11,7 +11,7 @@ class AllMergeSelector : public IMergeSelector { public: /// Parameter max_total_size_to_merge is ignored. - PartsRange select( + PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; }; diff --git a/src/Storages/MergeTree/MergeSelectors/MergeSelector.h b/src/Storages/MergeTree/MergeSelectors/MergeSelector.h index 2f17e1e9654..5e680d4b43b 100644 --- a/src/Storages/MergeTree/MergeSelectors/MergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/MergeSelector.h @@ -69,10 +69,17 @@ public: /** Function could be called at any frequency and it must decide, should you do any merge at all. * If better not to do any merge, it returns empty result. */ - virtual PartsRange select( + virtual PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) = 0; + virtual PartsRanges selectUpToTopN( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge, size_t) + { + return {selectBest(parts_ranges, max_total_size_to_merge)}; + } + virtual ~IMergeSelector() = default; }; diff --git a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp index c393349ef32..4201d809df5 100644 --- a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp @@ -243,7 +243,7 @@ void selectWithinPartition( } -SimpleMergeSelector::PartsRange SimpleMergeSelector::select( +SimpleMergeSelector::PartsRange SimpleMergeSelector::selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) { diff --git a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h index 2d4129b8bf8..ef206d59a52 100644 --- a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h @@ -169,7 +169,7 @@ public: explicit SimpleMergeSelector(const Settings & settings_) : settings(settings_) {} - PartsRange select( + PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; diff --git a/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.cpp index 75e3a090160..051cacd5692 100644 --- a/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.cpp @@ -34,7 +34,7 @@ const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info) } -IMergeSelector::PartsRange ITTLMergeSelector::select( +IMergeSelector::PartsRange ITTLMergeSelector::selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) { diff --git a/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.h index 4f43f88fe0b..cd188fc34a3 100644 --- a/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/TTLMergeSelector.h @@ -29,7 +29,7 @@ public: { } - PartsRange select( + PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; diff --git a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp index cd1fa7b01cd..ca019fda291 100644 --- a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp @@ -18,7 +18,7 @@ void registerTrivialMergeSelector(MergeSelectorFactory & factory) }); } -TrivialMergeSelector::PartsRange TrivialMergeSelector::select( +TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) { diff --git a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h index 6d989aea0fb..a0a80e0e1bb 100644 --- a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h @@ -21,7 +21,7 @@ public: size_t num_ranges_to_choose = 100; }; - PartsRange select( + PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 37b6539755c..150bb9fbc03 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -511,7 +511,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( TTLDeleteMergeSelector drop_ttl_selector(params_drop); /// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space - parts_to_merge = drop_ttl_selector.select(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]); + parts_to_merge = drop_ttl_selector.selectBest(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]); if (!parts_to_merge.empty()) { future_part->merge_type = MergeType::TTLDelete; @@ -528,7 +528,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( }; TTLDeleteMergeSelector delete_ttl_selector(params_delete); - parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge); + parts_to_merge = delete_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) future_part->merge_type = MergeType::TTLDelete; } @@ -546,7 +546,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( TTLRecompressMergeSelector recompress_ttl_selector(params); - parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge); + parts_to_merge = recompress_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) future_part->merge_type = MergeType::TTLRecompress; } @@ -584,7 +584,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( merge_settings = simple_merge_settings; } - parts_to_merge = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->select(parts_ranges, max_total_size_to_merge); + parts_to_merge = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->selectBest(parts_ranges, max_total_size_to_merge); /// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl if (parts_to_merge.size() == 1) diff --git a/src/Storages/examples/merge_selector.cpp b/src/Storages/examples/merge_selector.cpp index ae545f6cbd5..e5f05f20624 100644 --- a/src/Storages/examples/merge_selector.cpp +++ b/src/Storages/examples/merge_selector.cpp @@ -51,7 +51,7 @@ int main(int, char **) while (parts.size() > 1) { - IMergeSelector::PartsRange selected_parts = selector.select(partitions, 0); + IMergeSelector::PartsRange selected_parts = selector.selectBest(partitions, 0); if (selected_parts.empty()) { diff --git a/src/Storages/examples/merge_selector2.cpp b/src/Storages/examples/merge_selector2.cpp index 291ba19c868..a4ef030d851 100644 --- a/src/Storages/examples/merge_selector2.cpp +++ b/src/Storages/examples/merge_selector2.cpp @@ -48,7 +48,7 @@ int main(int, char **) while (parts.size() > 1) { - IMergeSelector::PartsRange selected_parts = selector.select(partitions, 100ULL * 1024 * 1024 * 1024); + IMergeSelector::PartsRange selected_parts = selector.selectBest(partitions, 100ULL * 1024 * 1024 * 1024); if (selected_parts.empty()) { From caca4b394c0ec63b700df9471c0f4064be4109bd Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 8 Nov 2024 16:18:16 +0100 Subject: [PATCH 2/3] Refactor MergeSelector to return multiple candidate ranges for merge --- src/Storages/MergeTree/MergeList.h | 1 + .../MergeSelectors/SimpleMergeSelector.cpp | 75 ++++++++++++--- .../MergeSelectors/SimpleMergeSelector.h | 5 + .../MergeSelectors/TrivialMergeSelector.cpp | 20 +++- .../MergeSelectors/TrivialMergeSelector.h | 6 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 95 +++++++++++++------ .../MergeTree/MergeTreeDataMergerMutator.h | 4 +- src/Storages/MergeTree/MergeTreeSettings.cpp | 1 + src/Storages/StorageMergeTree.cpp | 20 ++-- src/Storages/StorageReplicatedMergeTree.cpp | 29 ++++-- 10 files changed, 191 insertions(+), 65 deletions(-) diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 66190de0ef4..12ae481ced3 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -57,6 +57,7 @@ struct MergeInfo struct FutureMergedMutatedPart; using FutureMergedMutatedPartPtr = std::shared_ptr; +using FutureParts = std::vector; struct MergeListElement; using MergeListEntry = BackgroundProcessListEntry; diff --git a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp index 4201d809df5..9a234ff211c 100644 --- a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.cpp @@ -7,7 +7,6 @@ #include #include -#include #include @@ -39,6 +38,10 @@ struct Estimator { using Iterator = SimpleMergeSelector::PartsRange::const_iterator; + explicit Estimator(size_t top_n_ranges_) + : top_n_ranges(top_n_ranges_) + {} + void consider(Iterator begin, Iterator end, size_t sum_size, size_t size_prev_at_left, const SimpleMergeSelector::Settings & settings) { double current_score = score(end - begin, sum_size, settings.size_fixed_cost_to_add); @@ -56,17 +59,36 @@ struct Estimator while (end >= begin + 3 && (end - 1)->size < settings.heuristic_to_remove_small_parts_at_right_max_ratio * sum_size) --end; - if (min_score == 0.0 || current_score < min_score) - { - min_score = current_score; - best_begin = begin; - best_end = end; - } + while (best_ranges.size() > top_n_ranges) + best_ranges.pop(); + + best_ranges.push(ScoredRange{current_score, begin, end}); } SimpleMergeSelector::PartsRange getBest() const { - return SimpleMergeSelector::PartsRange(best_begin, best_end); + if (!best_ranges.empty()) + { + const auto & top = best_ranges.top(); + return SimpleMergeSelector::PartsRange(top.begin, top.end); + } + return SimpleMergeSelector::PartsRange{}; + } + + SimpleMergeSelector::PartsRanges getUpToTopN() const + { + SimpleMergeSelector::PartsRanges result; + result.reserve(best_ranges.size()); + while (!best_ranges.empty()) + { + const auto & top_range = best_ranges.top(); + result.push_back(SimpleMergeSelector::PartsRange(top_range.begin, top_range.end)); + best_ranges.pop(); + } + if (result.empty()) + result.push_back(SimpleMergeSelector::PartsRange{}); + + return result; } static double score(double count, double sum_size, double sum_size_fixed_cost) @@ -89,9 +111,21 @@ struct Estimator return (sum_size + sum_size_fixed_cost * count) / (count - 1.9); } - double min_score = 0.0; - Iterator best_begin {}; - Iterator best_end {}; + struct ScoredRange + { + double score = 0.0; + Iterator begin {}; + Iterator end {}; + + bool operator>(const ScoredRange & other) const + { + return score > other.score; + } + }; + + + mutable std::priority_queue, std::greater<>> best_ranges; + size_t top_n_ranges; }; @@ -247,7 +281,7 @@ SimpleMergeSelector::PartsRange SimpleMergeSelector::selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) { - Estimator estimator; + Estimator estimator(1); /// Precompute logarithm of settings boundaries, because log function is quite expensive in terms of performance const double min_size_to_lower_base_log = log(1 + settings.min_size_to_lower_base); @@ -259,4 +293,21 @@ SimpleMergeSelector::PartsRange SimpleMergeSelector::selectBest( return estimator.getBest(); } +SimpleMergeSelector::PartsRanges SimpleMergeSelector::selectUpToTopN( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge, size_t top_n_ranges) +{ + Estimator estimator(top_n_ranges); + /// Precompute logarithm of settings boundaries, because log function is quite expensive in terms of performance + const double min_size_to_lower_base_log = log(1 + settings.min_size_to_lower_base); + const double max_size_to_lower_base_log = log(1 + settings.max_size_to_lower_base); + + for (const auto & part_range : parts_ranges) + selectWithinPartition(part_range, max_total_size_to_merge, estimator, settings, min_size_to_lower_base_log, max_size_to_lower_base_log); + + return estimator.getUpToTopN(); + +} + + } diff --git a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h index ef206d59a52..9d247f244a2 100644 --- a/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h @@ -173,6 +173,11 @@ public: const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; + PartsRanges selectUpToTopN( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge, size_t top_n_ranges) override; + + private: const Settings settings; }; diff --git a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp index ca019fda291..b4640aab5a0 100644 --- a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp +++ b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.cpp @@ -18,9 +18,9 @@ void registerTrivialMergeSelector(MergeSelectorFactory & factory) }); } -TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( +TrivialMergeSelector::PartsRanges TrivialMergeSelector::selectUpToTopN( const PartsRanges & parts_ranges, - size_t max_total_size_to_merge) + size_t max_total_size_to_merge, size_t top_n_ranges) { size_t num_partitions = parts_ranges.size(); if (num_partitions == 0) @@ -44,11 +44,11 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( size_t right = 0; std::vector candidates; - while (candidates.size() < settings.num_ranges_to_choose) + while (candidates.size() < top_n_ranges) { const PartsRange & partition = parts_ranges[partition_idx]; - if (1 + right - left == settings.num_parts_to_merge) + if (1 + right - left == top_n_ranges) { ++right; @@ -59,7 +59,7 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( if (!max_total_size_to_merge || total_size <= max_total_size_to_merge) { candidates.emplace_back(partition.data() + left, partition.data() + right); - if (candidates.size() == settings.num_ranges_to_choose) + if (candidates.size() == settings.num_ranges_to_consider) break; } @@ -82,6 +82,16 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( left = right; } + return candidates; +} + + +TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge) +{ + auto candidates = selectUpToTopN(parts_ranges, max_total_size_to_merge, settings.num_ranges_to_consider); + if (candidates.empty()) return {}; diff --git a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h index a0a80e0e1bb..88f88458ae5 100644 --- a/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h +++ b/src/Storages/MergeTree/MergeSelectors/TrivialMergeSelector.h @@ -18,13 +18,17 @@ public: struct Settings { size_t num_parts_to_merge = 10; - size_t num_ranges_to_choose = 100; + size_t num_ranges_to_consider = 100; }; PartsRange selectBest( const PartsRanges & parts_ranges, size_t max_total_size_to_merge) override; + PartsRanges selectUpToTopN( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge, size_t top_n_ranges) override; + private: const Settings settings; }; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 150bb9fbc03..2fb943fb105 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -82,6 +82,7 @@ namespace MergeTreeSetting extern const MergeTreeSettingsMergeSelectorAlgorithm merge_selector_algorithm; extern const MergeTreeSettingsBool merge_selector_enable_heuristic_to_remove_small_parts_at_right; extern const MergeTreeSettingsFloat merge_selector_base; + extern const MergeTreeSettingsUInt64 merge_selector_max_ranges_to_select_at_once; } namespace ErrorCodes @@ -164,7 +165,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const } SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( - FutureMergedMutatedPartPtr future_part, + FutureParts & out_future_parts, bool aggressive, size_t max_total_size_to_merge, const AllowedMergingPredicate & can_merge_callback, @@ -191,7 +192,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( return SelectPartsDecision::CANNOT_SELECT; } - auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed, + auto res = selectPartsToMergeFromRanges(out_future_parts, aggressive, max_total_size_to_merge, merge_with_ttl_allowed, metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason); if (res == SelectPartsDecision::SELECTED) @@ -200,7 +201,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info); if (!best_partition_id_to_optimize.empty()) { - return selectAllPartsToMergeWithinPartition( + auto future_part = std::make_shared(); + res = selectAllPartsToMergeWithinPartition( future_part, can_merge_callback, best_partition_id_to_optimize, @@ -209,6 +211,10 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( txn, out_disable_reason, /*optimize_skip_merged_partitions=*/true); + + out_future_parts.emplace_back(future_part); + + return res; } if (!out_disable_reason.text.empty()) @@ -257,11 +263,11 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart for (size_t i = 0; i < all_partition_ids.size(); ++i) { - auto future_part = std::make_shared(); + FutureParts out_future_parts; PreformattedMessage out_disable_reason; /// This method should have been const, but something went wrong... it's const with dry_run = true auto status = const_cast(this)->selectPartsToMergeFromRanges( - future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed, + out_future_parts, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed, metadata_snapshot, ranges_per_partition[i], info.current_time, out_disable_reason, /* dry_run */ true); if (status == SelectPartsDecision::SELECTED) @@ -482,7 +488,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo } SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( - FutureMergedMutatedPartPtr future_part, + FutureParts & out_future_parts, bool aggressive, size_t max_total_size_to_merge, bool merge_with_ttl_allowed, @@ -494,7 +500,18 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( { Stopwatch select_parts_from_ranges_timer; const auto data_settings = data.getSettings(); - IMergeSelector::PartsRange parts_to_merge; + auto get_parts_from_range = [] (const IMergeSelector::PartsRange & parts_to_merge) + { + MergeTreeData::DataPartsVector parts; + parts.reserve(parts_to_merge.size()); + for (const IMergeSelector::Part & part_info : parts_to_merge) + { + const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr(); + parts.push_back(part); + } + return parts; + }; + if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled()) { @@ -511,10 +528,13 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( TTLDeleteMergeSelector drop_ttl_selector(params_drop); /// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space - parts_to_merge = drop_ttl_selector.selectBest(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]); + auto parts_to_merge = drop_ttl_selector.selectBest(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]); if (!parts_to_merge.empty()) { + auto future_part = std::make_shared(); future_part->merge_type = MergeType::TTLDelete; + future_part->assign(get_parts_from_range(parts_to_merge)); + out_future_parts.emplace_back(future_part); } else if (!(*data_settings)[MergeTreeSetting::ttl_only_drop_parts]) { @@ -530,7 +550,12 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( parts_to_merge = delete_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) + { + auto future_part = std::make_shared(); future_part->merge_type = MergeType::TTLDelete; + future_part->assign(get_parts_from_range(parts_to_merge)); + out_future_parts.emplace_back(future_part); + } } if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL()) @@ -548,11 +573,17 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( parts_to_merge = recompress_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) + { + auto future_part = std::make_shared(); future_part->merge_type = MergeType::TTLRecompress; + future_part->assign(get_parts_from_range(parts_to_merge)); + out_future_parts.emplace_back(future_part); + } } } - if (parts_to_merge.empty()) + /// Nothing is selected with specialized selectors + if (out_future_parts.empty()) { auto merge_selector_algorithm = (*data_settings)[MergeTreeSetting::merge_selector_algorithm]; @@ -584,32 +615,34 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( merge_settings = simple_merge_settings; } - parts_to_merge = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->selectBest(parts_ranges, max_total_size_to_merge); + auto parts_to_merge_ranges = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->selectUpToTopN(parts_ranges, max_total_size_to_merge, (*data_settings)[MergeTreeSetting::merge_selector_max_ranges_to_select_at_once]); - /// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl - if (parts_to_merge.size() == 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge selector returned only one part to merge"); - - if (parts_to_merge.empty()) + for (const auto & parts_to_merge : parts_to_merge_ranges) { - ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds()); - out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors) in {}ms", select_parts_from_ranges_timer.elapsedMicroseconds() / 1000); - return SelectPartsDecision::CANNOT_SELECT; + /// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl + if (parts_to_merge.size() == 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge selector returned only one part to merge"); + + if (parts_to_merge.empty()) + { + chassert(parts_to_merge_ranges.size() == 1); + ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds()); + out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors) in {}ms", select_parts_from_ranges_timer.elapsedMicroseconds() / 1000); + return SelectPartsDecision::CANNOT_SELECT; + } + else + { + auto future_part = std::make_shared(); + future_part->merge_type = MergeType::Regular; + auto parts = get_parts_from_range(parts_to_merge); + ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, parts.size()); + LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); + future_part->assign(std::move(parts)); + out_future_parts.emplace_back(future_part); + } } } - - MergeTreeData::DataPartsVector parts; - parts.reserve(parts_to_merge.size()); - for (IMergeSelector::Part & part_info : parts_to_merge) - { - const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr(); - parts.push_back(part); - } - - LOG_DEBUG(log, "Selected {} parts from {} to {} in {}ms", parts.size(), parts.front()->name, parts.back()->name, select_parts_from_ranges_timer.elapsedMicroseconds() / 1000); - ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, parts.size()); - - future_part->assign(std::move(parts)); + LOG_DEBUG(log, "Selected {} parts ranges in {}ms", out_future_parts.size(), select_parts_from_ranges_timer.elapsedMicroseconds() / 1000); ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds()); return SelectPartsDecision::SELECTED; } diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 6d209b9f931..fe446f3256c 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -96,7 +96,7 @@ public: /// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge SelectPartsDecision selectPartsToMergeFromRanges( - FutureMergedMutatedPartPtr future_part, + FutureParts & out_future_parts, bool aggressive, size_t max_total_size_to_merge, bool merge_with_ttl_allowed, @@ -125,7 +125,7 @@ public: * - A part that already merges with something in one place, you can not start to merge into something else in another place. */ SelectPartsDecision selectPartsToMerge( - FutureMergedMutatedPartPtr future_part, + FutureParts & out_future_parts, bool aggressive, size_t max_total_size_to_merge, const AllowedMergingPredicate & can_merge, diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index 33910d1048d..d79594eacb9 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -58,6 +58,7 @@ namespace ErrorCodes /** Merge selector settings. */ \ DECLARE(UInt64, merge_selector_blurry_base_scale_factor, 0, "Controls when the logic kicks in relatively to the number of parts in partition. The bigger the factor the more belated reaction will be.", 0) \ DECLARE(UInt64, merge_selector_window_size, 1000, "How many parts to look at once.", 0) \ + DECLARE(UInt64, merge_selector_max_ranges_to_select_at_once, 10, "How many ranges merge selector will return per one invocation. Work only for SimpleMergeSelector and StochasticSimpleMergeSelector.", 0) \ \ /** Merge settings. */ \ DECLARE(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \ diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 1ba0617d8ae..52157e972a4 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -996,10 +996,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( { auto data_settings = getSettings(); - auto future_part = std::make_shared(); + FutureParts future_parts; + + UUID part_uuid = UUIDHelpers::Nil; if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) - future_part->uuid = UUIDHelpers::generateV4(); + part_uuid = UUIDHelpers::generateV4(); /// You must call destructor with unlocked `currently_processing_in_background_mutex`. CurrentlyMergingPartsTaggerPtr merging_tagger; @@ -1089,7 +1091,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( if (max_source_parts_size > 0) { select_decision = merger_mutator.selectPartsToMerge( - future_part, + future_parts, aggressive, max_source_parts_size, can_merge, @@ -1126,6 +1128,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( break; } + auto future_part = std::make_shared(); select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); @@ -1144,7 +1147,10 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( } } else + { + future_parts.push_back(future_part); break; + } } } @@ -1162,12 +1168,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( return {}; } + auto best_future_part = future_parts.front(); + best_future_part->uuid = part_uuid; /// Account TTL merge here to avoid exceeding the max_number_of_merges_with_ttl_in_pool limit - if (isTTLMergeType(future_part->merge_type)) + if (isTTLMergeType(best_future_part->merge_type)) getContext()->getMergeList().bookMergeWithTTL(); - merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false); - return std::make_shared(future_part, std::move(merging_tagger), std::make_shared()); + merging_tagger = std::make_unique(best_future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(best_future_part->parts, true), *this, metadata_snapshot, false); + return std::make_shared(best_future_part, std::move(merging_tagger), std::make_shared()); } bool StorageMergeTree::merge( diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 793fd02c656..b979cb7417a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3919,9 +3919,6 @@ void StorageReplicatedMergeTree::mergeSelectingTask() bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < (*storage_settings_ptr)[MergeTreeSetting::max_replicated_merges_with_ttl_in_queue] && getTotalMergesWithTTLInMergeList() < (*storage_settings_ptr)[MergeTreeSetting::max_number_of_merges_with_ttl_in_pool]; - auto future_merged_part = std::make_shared(); - if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) - future_merged_part->uuid = UUIDHelpers::generateV4(); bool can_assign_merge = max_source_parts_size_for_merge > 0; PartitionIdsHint partitions_to_merge_in; @@ -3936,11 +3933,20 @@ void StorageReplicatedMergeTree::mergeSelectingTask() merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in)); } + UUID part_uuid = UUIDHelpers::Nil; + + if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) + part_uuid = UUIDHelpers::generateV4(); + + FutureParts future_parts; PreformattedMessage out_reason; if (can_assign_merge && - merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred, + merger_mutator.selectPartsToMerge(future_parts, false, max_source_parts_size_for_merge, *merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR, out_reason, &partitions_to_merge_in) == SelectPartsDecision::SELECTED) { + auto future_merged_part = future_parts.front(); + future_merged_part->uuid = part_uuid; + create_result = createLogEntryToMergeParts( zookeeper, future_merged_part->parts, @@ -3984,7 +3990,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() create_result = createLogEntryToMutatePart( *part, - future_merged_part->uuid, + part_uuid, desired_mutation_version->first, desired_mutation_version->second, merge_pred->getVersion()); @@ -5944,22 +5950,24 @@ bool StorageReplicatedMergeTree::optimize( } ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper, std::move(partition_ids_hint)); - auto future_merged_part = std::make_shared(); + auto part_uuid = UUIDHelpers::Nil; if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) - future_merged_part->uuid = UUIDHelpers::generateV4(); + part_uuid = UUIDHelpers::generateV4(); constexpr const char * unknown_disable_reason = "unknown reason"; PreformattedMessage disable_reason = PreformattedMessage::create(unknown_disable_reason); SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; + FutureParts future_parts; if (partition_id.empty()) { select_decision = merger_mutator.selectPartsToMerge( - future_merged_part, /* aggressive */ true, (*storage_settings_ptr)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool], + future_parts, /* aggressive */ true, (*storage_settings_ptr)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool], can_merge, /* merge_with_ttl_allowed */ false, NO_TRANSACTION_PTR, disable_reason); } else { + auto future_merged_part = std::make_shared(); select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( future_merged_part, can_merge, @@ -5969,6 +5977,9 @@ bool StorageReplicatedMergeTree::optimize( NO_TRANSACTION_PTR, disable_reason, query_context->getSettingsRef()[Setting::optimize_skip_merged_partitions]); + + if (select_decision == SelectPartsDecision::SELECTED) + future_parts.push_back(future_merged_part); } /// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization) @@ -5995,6 +6006,8 @@ bool StorageReplicatedMergeTree::optimize( return handle_noop(message_fmt, disable_reason.text); } + auto future_merged_part = future_parts.front(); + future_merged_part->uuid = part_uuid; ReplicatedMergeTreeLogEntryData merge_entry; CreateMergeEntryResult create_result = createLogEntryToMergeParts( zookeeper, future_merged_part->parts, From d414dea9b9032fba5d09250dbd175b11cc24df2e Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 8 Nov 2024 17:26:18 +0100 Subject: [PATCH 3/3] Fix crash --- src/Storages/StorageMergeTree.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 52157e972a4..88cfc08f236 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1132,6 +1132,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); + future_parts.push_back(future_part); + /// If final - we will wait for currently processing merges to finish and continue. if (final && select_decision != SelectPartsDecision::SELECTED @@ -1148,7 +1150,6 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( } else { - future_parts.push_back(future_part); break; } }