This commit is contained in:
alesapin 2024-11-20 15:16:27 -08:00 committed by GitHub
commit 2802d3d73c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 211 additions and 77 deletions

View File

@ -57,6 +57,7 @@ struct MergeInfo
struct FutureMergedMutatedPart; struct FutureMergedMutatedPart;
using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>; using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
using FutureParts = std::vector<FutureMergedMutatedPartPtr>;
struct MergeListElement; struct MergeListElement;
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>; using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;

View File

@ -12,7 +12,7 @@ void registerAllMergeSelector(MergeSelectorFactory & factory)
}); });
} }
AllMergeSelector::PartsRange AllMergeSelector::select( AllMergeSelector::PartsRange AllMergeSelector::selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t /*max_total_size_to_merge*/) size_t /*max_total_size_to_merge*/)
{ {

View File

@ -11,7 +11,7 @@ class AllMergeSelector : public IMergeSelector
{ {
public: public:
/// Parameter max_total_size_to_merge is ignored. /// Parameter max_total_size_to_merge is ignored.
PartsRange select( PartsRange selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) override; size_t max_total_size_to_merge) override;
}; };

View File

@ -69,10 +69,17 @@ public:
/** Function could be called at any frequency and it must decide, should you do any merge at all. /** Function could be called at any frequency and it must decide, should you do any merge at all.
* If better not to do any merge, it returns empty result. * If better not to do any merge, it returns empty result.
*/ */
virtual PartsRange select( virtual PartsRange selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) = 0; size_t max_total_size_to_merge) = 0;
virtual PartsRanges selectUpToTopN(
const PartsRanges & parts_ranges,
size_t max_total_size_to_merge, size_t)
{
return {selectBest(parts_ranges, max_total_size_to_merge)};
}
virtual ~IMergeSelector() = default; virtual ~IMergeSelector() = default;
}; };

View File

@ -7,7 +7,6 @@
#include <cmath> #include <cmath>
#include <cassert> #include <cassert>
#include <iostream>
#include <random> #include <random>
@ -39,6 +38,10 @@ struct Estimator
{ {
using Iterator = SimpleMergeSelector::PartsRange::const_iterator; using Iterator = SimpleMergeSelector::PartsRange::const_iterator;
explicit Estimator(size_t top_n_ranges_)
: top_n_ranges(top_n_ranges_)
{}
void consider(Iterator begin, Iterator end, size_t sum_size, size_t size_prev_at_left, const SimpleMergeSelector::Settings & settings) void consider(Iterator begin, Iterator end, size_t sum_size, size_t size_prev_at_left, const SimpleMergeSelector::Settings & settings)
{ {
double current_score = score(end - begin, sum_size, settings.size_fixed_cost_to_add); double current_score = score(end - begin, sum_size, settings.size_fixed_cost_to_add);
@ -56,17 +59,36 @@ struct Estimator
while (end >= begin + 3 && (end - 1)->size < settings.heuristic_to_remove_small_parts_at_right_max_ratio * sum_size) while (end >= begin + 3 && (end - 1)->size < settings.heuristic_to_remove_small_parts_at_right_max_ratio * sum_size)
--end; --end;
if (min_score == 0.0 || current_score < min_score) while (best_ranges.size() > top_n_ranges)
{ best_ranges.pop();
min_score = current_score;
best_begin = begin; best_ranges.push(ScoredRange{current_score, begin, end});
best_end = end;
}
} }
SimpleMergeSelector::PartsRange getBest() const SimpleMergeSelector::PartsRange getBest() const
{ {
return SimpleMergeSelector::PartsRange(best_begin, best_end); if (!best_ranges.empty())
{
const auto & top = best_ranges.top();
return SimpleMergeSelector::PartsRange(top.begin, top.end);
}
return SimpleMergeSelector::PartsRange{};
}
SimpleMergeSelector::PartsRanges getUpToTopN() const
{
SimpleMergeSelector::PartsRanges result;
result.reserve(best_ranges.size());
while (!best_ranges.empty())
{
const auto & top_range = best_ranges.top();
result.push_back(SimpleMergeSelector::PartsRange(top_range.begin, top_range.end));
best_ranges.pop();
}
if (result.empty())
result.push_back(SimpleMergeSelector::PartsRange{});
return result;
} }
static double score(double count, double sum_size, double sum_size_fixed_cost) static double score(double count, double sum_size, double sum_size_fixed_cost)
@ -89,9 +111,21 @@ struct Estimator
return (sum_size + sum_size_fixed_cost * count) / (count - 1.9); return (sum_size + sum_size_fixed_cost * count) / (count - 1.9);
} }
double min_score = 0.0; struct ScoredRange
Iterator best_begin {}; {
Iterator best_end {}; double score = 0.0;
Iterator begin {};
Iterator end {};
bool operator>(const ScoredRange & other) const
{
return score > other.score;
}
};
mutable std::priority_queue<ScoredRange, std::vector<ScoredRange>, std::greater<>> best_ranges;
size_t top_n_ranges;
}; };
@ -246,11 +280,11 @@ void selectWithinPartition(
} }
SimpleMergeSelector::PartsRange SimpleMergeSelector::select( SimpleMergeSelector::PartsRange SimpleMergeSelector::selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) size_t max_total_size_to_merge)
{ {
Estimator estimator; Estimator estimator(1);
/// Precompute logarithm of settings boundaries, because log function is quite expensive in terms of performance /// Precompute logarithm of settings boundaries, because log function is quite expensive in terms of performance
const double min_size_to_lower_base_log = log(1 + settings.min_size_to_lower_base); const double min_size_to_lower_base_log = log(1 + settings.min_size_to_lower_base);
@ -262,4 +296,21 @@ SimpleMergeSelector::PartsRange SimpleMergeSelector::select(
return estimator.getBest(); return estimator.getBest();
} }
SimpleMergeSelector::PartsRanges SimpleMergeSelector::selectUpToTopN(
const PartsRanges & parts_ranges,
size_t max_total_size_to_merge, size_t top_n_ranges)
{
Estimator estimator(top_n_ranges);
/// Precompute logarithm of settings boundaries, because log function is quite expensive in terms of performance
const double min_size_to_lower_base_log = log(1 + settings.min_size_to_lower_base);
const double max_size_to_lower_base_log = log(1 + settings.max_size_to_lower_base);
for (const auto & part_range : parts_ranges)
selectWithinPartition(part_range, max_total_size_to_merge, estimator, settings, min_size_to_lower_base_log, max_size_to_lower_base_log);
return estimator.getUpToTopN();
}
} }

View File

@ -171,10 +171,15 @@ public:
explicit SimpleMergeSelector(const Settings & settings_) : settings(settings_) {} explicit SimpleMergeSelector(const Settings & settings_) : settings(settings_) {}
PartsRange select( PartsRange selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) override; size_t max_total_size_to_merge) override;
PartsRanges selectUpToTopN(
const PartsRanges & parts_ranges,
size_t max_total_size_to_merge, size_t top_n_ranges) override;
private: private:
const Settings settings; const Settings settings;
}; };

View File

@ -34,7 +34,7 @@ const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info)
} }
IMergeSelector::PartsRange ITTLMergeSelector::select( IMergeSelector::PartsRange ITTLMergeSelector::selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) size_t max_total_size_to_merge)
{ {

View File

@ -29,7 +29,7 @@ public:
{ {
} }
PartsRange select( PartsRange selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) override; size_t max_total_size_to_merge) override;

View File

@ -18,9 +18,9 @@ void registerTrivialMergeSelector(MergeSelectorFactory & factory)
}); });
} }
TrivialMergeSelector::PartsRange TrivialMergeSelector::select( TrivialMergeSelector::PartsRanges TrivialMergeSelector::selectUpToTopN(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) size_t max_total_size_to_merge, size_t top_n_ranges)
{ {
size_t num_partitions = parts_ranges.size(); size_t num_partitions = parts_ranges.size();
if (num_partitions == 0) if (num_partitions == 0)
@ -44,11 +44,11 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::select(
size_t right = 0; size_t right = 0;
std::vector<PartsRange> candidates; std::vector<PartsRange> candidates;
while (candidates.size() < settings.num_ranges_to_choose) while (candidates.size() < top_n_ranges)
{ {
const PartsRange & partition = parts_ranges[partition_idx]; const PartsRange & partition = parts_ranges[partition_idx];
if (1 + right - left == settings.num_parts_to_merge) if (1 + right - left == top_n_ranges)
{ {
++right; ++right;
@ -59,7 +59,7 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::select(
if (!max_total_size_to_merge || total_size <= max_total_size_to_merge) if (!max_total_size_to_merge || total_size <= max_total_size_to_merge)
{ {
candidates.emplace_back(partition.data() + left, partition.data() + right); candidates.emplace_back(partition.data() + left, partition.data() + right);
if (candidates.size() == settings.num_ranges_to_choose) if (candidates.size() == settings.num_ranges_to_consider)
break; break;
} }
@ -82,6 +82,16 @@ TrivialMergeSelector::PartsRange TrivialMergeSelector::select(
left = right; left = right;
} }
return candidates;
}
TrivialMergeSelector::PartsRange TrivialMergeSelector::selectBest(
const PartsRanges & parts_ranges,
size_t max_total_size_to_merge)
{
auto candidates = selectUpToTopN(parts_ranges, max_total_size_to_merge, settings.num_ranges_to_consider);
if (candidates.empty()) if (candidates.empty())
return {}; return {};

View File

@ -18,13 +18,17 @@ public:
struct Settings struct Settings
{ {
size_t num_parts_to_merge = 10; size_t num_parts_to_merge = 10;
size_t num_ranges_to_choose = 100; size_t num_ranges_to_consider = 100;
}; };
PartsRange select( PartsRange selectBest(
const PartsRanges & parts_ranges, const PartsRanges & parts_ranges,
size_t max_total_size_to_merge) override; size_t max_total_size_to_merge) override;
PartsRanges selectUpToTopN(
const PartsRanges & parts_ranges,
size_t max_total_size_to_merge, size_t top_n_ranges) override;
private: private:
const Settings settings; const Settings settings;
}; };

View File

@ -82,6 +82,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsMergeSelectorAlgorithm merge_selector_algorithm; extern const MergeTreeSettingsMergeSelectorAlgorithm merge_selector_algorithm;
extern const MergeTreeSettingsBool merge_selector_enable_heuristic_to_remove_small_parts_at_right; extern const MergeTreeSettingsBool merge_selector_enable_heuristic_to_remove_small_parts_at_right;
extern const MergeTreeSettingsFloat merge_selector_base; extern const MergeTreeSettingsFloat merge_selector_base;
extern const MergeTreeSettingsUInt64 merge_selector_max_ranges_to_select_at_once;
extern const MergeTreeSettingsUInt64 min_parts_to_merge_at_once; extern const MergeTreeSettingsUInt64 min_parts_to_merge_at_once;
} }
@ -165,7 +166,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const
} }
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
FutureMergedMutatedPartPtr future_part, FutureParts & out_future_parts,
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback, const AllowedMergingPredicate & can_merge_callback,
@ -192,7 +193,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::CANNOT_SELECT; return SelectPartsDecision::CANNOT_SELECT;
} }
auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed, auto res = selectPartsToMergeFromRanges(out_future_parts, aggressive, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason); metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason);
if (res == SelectPartsDecision::SELECTED) if (res == SelectPartsDecision::SELECTED)
@ -201,7 +202,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info); String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
if (!best_partition_id_to_optimize.empty()) if (!best_partition_id_to_optimize.empty())
{ {
return selectAllPartsToMergeWithinPartition( auto future_part = std::make_shared<FutureMergedMutatedPart>();
res = selectAllPartsToMergeWithinPartition(
future_part, future_part,
can_merge_callback, can_merge_callback,
best_partition_id_to_optimize, best_partition_id_to_optimize,
@ -210,6 +212,10 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
txn, txn,
out_disable_reason, out_disable_reason,
/*optimize_skip_merged_partitions=*/true); /*optimize_skip_merged_partitions=*/true);
out_future_parts.emplace_back(future_part);
return res;
} }
if (!out_disable_reason.text.empty()) if (!out_disable_reason.text.empty())
@ -258,11 +264,11 @@ MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPart
for (size_t i = 0; i < all_partition_ids.size(); ++i) for (size_t i = 0; i < all_partition_ids.size(); ++i)
{ {
auto future_part = std::make_shared<FutureMergedMutatedPart>(); FutureParts out_future_parts;
PreformattedMessage out_disable_reason; PreformattedMessage out_disable_reason;
/// This method should have been const, but something went wrong... it's const with dry_run = true /// This method should have been const, but something went wrong... it's const with dry_run = true
auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges( auto status = const_cast<MergeTreeDataMergerMutator *>(this)->selectPartsToMergeFromRanges(
future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed, out_future_parts, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, ranges_per_partition[i], info.current_time, out_disable_reason, metadata_snapshot, ranges_per_partition[i], info.current_time, out_disable_reason,
/* dry_run */ true); /* dry_run */ true);
if (status == SelectPartsDecision::SELECTED) if (status == SelectPartsDecision::SELECTED)
@ -483,7 +489,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
} }
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part, FutureParts & out_future_parts,
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
bool merge_with_ttl_allowed, bool merge_with_ttl_allowed,
@ -495,7 +501,18 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
{ {
Stopwatch select_parts_from_ranges_timer; Stopwatch select_parts_from_ranges_timer;
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
IMergeSelector::PartsRange parts_to_merge; auto get_parts_from_range = [] (const IMergeSelector::PartsRange & parts_to_merge)
{
MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size());
for (const IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
parts.push_back(part);
}
return parts;
};
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled()) if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{ {
@ -512,10 +529,13 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
TTLDeleteMergeSelector drop_ttl_selector(params_drop); TTLDeleteMergeSelector drop_ttl_selector(params_drop);
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space /// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
parts_to_merge = drop_ttl_selector.select(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]); auto parts_to_merge = drop_ttl_selector.selectBest(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]);
if (!parts_to_merge.empty()) if (!parts_to_merge.empty())
{ {
auto future_part = std::make_shared<FutureMergedMutatedPart>();
future_part->merge_type = MergeType::TTLDelete; future_part->merge_type = MergeType::TTLDelete;
future_part->assign(get_parts_from_range(parts_to_merge));
out_future_parts.emplace_back(future_part);
} }
else if (!(*data_settings)[MergeTreeSetting::ttl_only_drop_parts]) else if (!(*data_settings)[MergeTreeSetting::ttl_only_drop_parts])
{ {
@ -529,9 +549,14 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
}; };
TTLDeleteMergeSelector delete_ttl_selector(params_delete); TTLDeleteMergeSelector delete_ttl_selector(params_delete);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge); parts_to_merge = delete_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty()) if (!parts_to_merge.empty())
{
auto future_part = std::make_shared<FutureMergedMutatedPart>();
future_part->merge_type = MergeType::TTLDelete; future_part->merge_type = MergeType::TTLDelete;
future_part->assign(get_parts_from_range(parts_to_merge));
out_future_parts.emplace_back(future_part);
}
} }
if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL()) if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL())
@ -547,13 +572,19 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
TTLRecompressMergeSelector recompress_ttl_selector(params); TTLRecompressMergeSelector recompress_ttl_selector(params);
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge); parts_to_merge = recompress_ttl_selector.selectBest(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty()) if (!parts_to_merge.empty())
{
auto future_part = std::make_shared<FutureMergedMutatedPart>();
future_part->merge_type = MergeType::TTLRecompress; future_part->merge_type = MergeType::TTLRecompress;
future_part->assign(get_parts_from_range(parts_to_merge));
out_future_parts.emplace_back(future_part);
}
} }
} }
if (parts_to_merge.empty()) /// Nothing is selected with specialized selectors
if (out_future_parts.empty())
{ {
auto merge_selector_algorithm = (*data_settings)[MergeTreeSetting::merge_selector_algorithm]; auto merge_selector_algorithm = (*data_settings)[MergeTreeSetting::merge_selector_algorithm];
@ -586,32 +617,34 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
merge_settings = simple_merge_settings; merge_settings = simple_merge_settings;
} }
parts_to_merge = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->select(parts_ranges, max_total_size_to_merge); auto parts_to_merge_ranges = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->selectUpToTopN(parts_ranges, max_total_size_to_merge, (*data_settings)[MergeTreeSetting::merge_selector_max_ranges_to_select_at_once]);
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl for (const auto & parts_to_merge : parts_to_merge_ranges)
if (parts_to_merge.size() == 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge selector returned only one part to merge");
if (parts_to_merge.empty())
{ {
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds()); /// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors) in {}ms", select_parts_from_ranges_timer.elapsedMicroseconds() / 1000); if (parts_to_merge.size() == 1)
return SelectPartsDecision::CANNOT_SELECT; throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge selector returned only one part to merge");
if (parts_to_merge.empty())
{
chassert(parts_to_merge_ranges.size() == 1);
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds());
out_disable_reason = PreformattedMessage::create("Did not find any parts to merge (with usual merge selectors) in {}ms", select_parts_from_ranges_timer.elapsedMicroseconds() / 1000);
return SelectPartsDecision::CANNOT_SELECT;
}
else
{
auto future_part = std::make_shared<FutureMergedMutatedPart>();
future_part->merge_type = MergeType::Regular;
auto parts = get_parts_from_range(parts_to_merge);
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, parts.size());
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
future_part->assign(std::move(parts));
out_future_parts.emplace_back(future_part);
}
} }
} }
LOG_DEBUG(log, "Selected {} parts ranges in {}ms", out_future_parts.size(), select_parts_from_ranges_timer.elapsedMicroseconds() / 1000);
MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size());
for (IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();
parts.push_back(part);
}
LOG_DEBUG(log, "Selected {} parts from {} to {} in {}ms", parts.size(), parts.front()->name, parts.back()->name, select_parts_from_ranges_timer.elapsedMicroseconds() / 1000);
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectRangePartsCount, parts.size());
future_part->assign(std::move(parts));
ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::MergerMutatorSelectPartsForMergeElapsedMicroseconds, select_parts_from_ranges_timer.elapsedMicroseconds());
return SelectPartsDecision::SELECTED; return SelectPartsDecision::SELECTED;
} }

View File

@ -96,7 +96,7 @@ public:
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge /// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
SelectPartsDecision selectPartsToMergeFromRanges( SelectPartsDecision selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part, FutureParts & out_future_parts,
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
bool merge_with_ttl_allowed, bool merge_with_ttl_allowed,
@ -125,7 +125,7 @@ public:
* - A part that already merges with something in one place, you can not start to merge into something else in another place. * - A part that already merges with something in one place, you can not start to merge into something else in another place.
*/ */
SelectPartsDecision selectPartsToMerge( SelectPartsDecision selectPartsToMerge(
FutureMergedMutatedPartPtr future_part, FutureParts & out_future_parts,
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge, const AllowedMergingPredicate & can_merge,

View File

@ -59,6 +59,7 @@ namespace ErrorCodes
/** Merge selector settings. */ \ /** Merge selector settings. */ \
DECLARE(UInt64, merge_selector_blurry_base_scale_factor, 0, "Controls when the logic kicks in relatively to the number of parts in partition. The bigger the factor the more belated reaction will be.", 0) \ DECLARE(UInt64, merge_selector_blurry_base_scale_factor, 0, "Controls when the logic kicks in relatively to the number of parts in partition. The bigger the factor the more belated reaction will be.", 0) \
DECLARE(UInt64, merge_selector_window_size, 1000, "How many parts to look at once.", 0) \ DECLARE(UInt64, merge_selector_window_size, 1000, "How many parts to look at once.", 0) \
DECLARE(UInt64, merge_selector_max_ranges_to_select_at_once, 10, "How many ranges merge selector will return per one invocation. Work only for SimpleMergeSelector and StochasticSimpleMergeSelector.", 0) \
\ \
/** Merge settings. */ \ /** Merge settings. */ \
DECLARE(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \ DECLARE(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \

View File

@ -994,10 +994,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
{ {
auto data_settings = getSettings(); auto data_settings = getSettings();
auto future_part = std::make_shared<FutureMergedMutatedPart>(); FutureParts future_parts;
UUID part_uuid = UUIDHelpers::Nil;
if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids])
future_part->uuid = UUIDHelpers::generateV4(); part_uuid = UUIDHelpers::generateV4();
/// You must call destructor with unlocked `currently_processing_in_background_mutex`. /// You must call destructor with unlocked `currently_processing_in_background_mutex`.
CurrentlyMergingPartsTaggerPtr merging_tagger; CurrentlyMergingPartsTaggerPtr merging_tagger;
@ -1087,7 +1089,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (max_source_parts_size > 0) if (max_source_parts_size > 0)
{ {
select_decision = merger_mutator.selectPartsToMerge( select_decision = merger_mutator.selectPartsToMerge(
future_part, future_parts,
aggressive, aggressive,
max_source_parts_size, max_source_parts_size,
can_merge, can_merge,
@ -1124,9 +1126,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
break; break;
} }
auto future_part = std::make_shared<FutureMergedMutatedPart>();
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions);
future_parts.push_back(future_part);
/// If final - we will wait for currently processing merges to finish and continue. /// If final - we will wait for currently processing merges to finish and continue.
if (final if (final
&& select_decision != SelectPartsDecision::SELECTED && select_decision != SelectPartsDecision::SELECTED
@ -1142,7 +1147,9 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
} }
} }
else else
{
break; break;
}
} }
} }
@ -1160,12 +1167,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
return {}; return {};
} }
auto best_future_part = future_parts.front();
best_future_part->uuid = part_uuid;
/// Account TTL merge here to avoid exceeding the max_number_of_merges_with_ttl_in_pool limit /// Account TTL merge here to avoid exceeding the max_number_of_merges_with_ttl_in_pool limit
if (isTTLMergeType(future_part->merge_type)) if (isTTLMergeType(best_future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL(); getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false); merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(best_future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(best_future_part->parts, true), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>()); return std::make_shared<MergeMutateSelectedEntry>(best_future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
} }
bool StorageMergeTree::merge( bool StorageMergeTree::merge(

View File

@ -3921,9 +3921,6 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < (*storage_settings_ptr)[MergeTreeSetting::max_replicated_merges_with_ttl_in_queue] && bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < (*storage_settings_ptr)[MergeTreeSetting::max_replicated_merges_with_ttl_in_queue] &&
getTotalMergesWithTTLInMergeList() < (*storage_settings_ptr)[MergeTreeSetting::max_number_of_merges_with_ttl_in_pool]; getTotalMergesWithTTLInMergeList() < (*storage_settings_ptr)[MergeTreeSetting::max_number_of_merges_with_ttl_in_pool];
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>();
if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids])
future_merged_part->uuid = UUIDHelpers::generateV4();
bool can_assign_merge = max_source_parts_size_for_merge > 0; bool can_assign_merge = max_source_parts_size_for_merge > 0;
PartitionIdsHint partitions_to_merge_in; PartitionIdsHint partitions_to_merge_in;
@ -3938,11 +3935,20 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in)); merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in));
} }
UUID part_uuid = UUIDHelpers::Nil;
if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids])
part_uuid = UUIDHelpers::generateV4();
FutureParts future_parts;
PreformattedMessage out_reason; PreformattedMessage out_reason;
if (can_assign_merge && if (can_assign_merge &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred, merger_mutator.selectPartsToMerge(future_parts, false, max_source_parts_size_for_merge, *merge_pred,
merge_with_ttl_allowed, NO_TRANSACTION_PTR, out_reason, &partitions_to_merge_in) == SelectPartsDecision::SELECTED) merge_with_ttl_allowed, NO_TRANSACTION_PTR, out_reason, &partitions_to_merge_in) == SelectPartsDecision::SELECTED)
{ {
auto future_merged_part = future_parts.front();
future_merged_part->uuid = part_uuid;
create_result = createLogEntryToMergeParts( create_result = createLogEntryToMergeParts(
zookeeper, zookeeper,
future_merged_part->parts, future_merged_part->parts,
@ -3986,7 +3992,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
create_result = createLogEntryToMutatePart( create_result = createLogEntryToMutatePart(
*part, *part,
future_merged_part->uuid, part_uuid,
desired_mutation_version->first, desired_mutation_version->first,
desired_mutation_version->second, desired_mutation_version->second,
merge_pred->getVersion()); merge_pred->getVersion());
@ -5943,22 +5949,24 @@ bool StorageReplicatedMergeTree::optimize(
} }
ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper, std::move(partition_ids_hint)); ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper, std::move(partition_ids_hint));
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>(); auto part_uuid = UUIDHelpers::Nil;
if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids]) if ((*storage_settings.get())[MergeTreeSetting::assign_part_uuids])
future_merged_part->uuid = UUIDHelpers::generateV4(); part_uuid = UUIDHelpers::generateV4();
constexpr const char * unknown_disable_reason = "unknown reason"; constexpr const char * unknown_disable_reason = "unknown reason";
PreformattedMessage disable_reason = PreformattedMessage::create(unknown_disable_reason); PreformattedMessage disable_reason = PreformattedMessage::create(unknown_disable_reason);
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
FutureParts future_parts;
if (partition_id.empty()) if (partition_id.empty())
{ {
select_decision = merger_mutator.selectPartsToMerge( select_decision = merger_mutator.selectPartsToMerge(
future_merged_part, /* aggressive */ true, (*storage_settings_ptr)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool], future_parts, /* aggressive */ true, (*storage_settings_ptr)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool],
can_merge, /* merge_with_ttl_allowed */ false, NO_TRANSACTION_PTR, disable_reason); can_merge, /* merge_with_ttl_allowed */ false, NO_TRANSACTION_PTR, disable_reason);
} }
else else
{ {
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>();
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, future_merged_part,
can_merge, can_merge,
@ -5968,6 +5976,9 @@ bool StorageReplicatedMergeTree::optimize(
NO_TRANSACTION_PTR, NO_TRANSACTION_PTR,
disable_reason, disable_reason,
query_context->getSettingsRef()[Setting::optimize_skip_merged_partitions]); query_context->getSettingsRef()[Setting::optimize_skip_merged_partitions]);
if (select_decision == SelectPartsDecision::SELECTED)
future_parts.push_back(future_merged_part);
} }
/// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization) /// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization)
@ -5994,6 +6005,8 @@ bool StorageReplicatedMergeTree::optimize(
return handle_noop(message_fmt, disable_reason.text); return handle_noop(message_fmt, disable_reason.text);
} }
auto future_merged_part = future_parts.front();
future_merged_part->uuid = part_uuid;
ReplicatedMergeTreeLogEntryData merge_entry; ReplicatedMergeTreeLogEntryData merge_entry;
CreateMergeEntryResult create_result = createLogEntryToMergeParts( CreateMergeEntryResult create_result = createLogEntryToMergeParts(
zookeeper, future_merged_part->parts, zookeeper, future_merged_part->parts,

View File

@ -51,7 +51,7 @@ int main(int, char **)
while (parts.size() > 1) while (parts.size() > 1)
{ {
IMergeSelector::PartsRange selected_parts = selector.select(partitions, 0); IMergeSelector::PartsRange selected_parts = selector.selectBest(partitions, 0);
if (selected_parts.empty()) if (selected_parts.empty())
{ {

View File

@ -48,7 +48,7 @@ int main(int, char **)
while (parts.size() > 1) while (parts.size() > 1)
{ {
IMergeSelector::PartsRange selected_parts = selector.select(partitions, 100ULL * 1024 * 1024 * 1024); IMergeSelector::PartsRange selected_parts = selector.selectBest(partitions, 100ULL * 1024 * 1024 * 1024);
if (selected_parts.empty()) if (selected_parts.empty())
{ {