From 9b551d8ad219ecf11d414cf034cfcecb0d150d83 Mon Sep 17 00:00:00 2001 From: serxa Date: Thu, 24 Oct 2024 20:20:53 +0000 Subject: [PATCH] wip: new merge selector --- src/Core/MergeSelectorAlgorithm.h | 1 + .../MergeSelectors/ComplexMergeSelector.cpp | 127 ++++++++++++++++++ .../MergeSelectors/ComplexMergeSelector.h | 28 ++++ 3 files changed, 156 insertions(+) create mode 100644 src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.cpp create mode 100644 src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.h diff --git a/src/Core/MergeSelectorAlgorithm.h b/src/Core/MergeSelectorAlgorithm.h index 0f6831c1f9e..8af23d0f885 100644 --- a/src/Core/MergeSelectorAlgorithm.h +++ b/src/Core/MergeSelectorAlgorithm.h @@ -8,6 +8,7 @@ enum class MergeSelectorAlgorithm : uint8_t { SIMPLE, STOCHASTIC_SIMPLE, + COMPLEX, }; } diff --git a/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.cpp b/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.cpp new file mode 100644 index 00000000000..e988835116f --- /dev/null +++ b/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.cpp @@ -0,0 +1,127 @@ +#include +#include +#include + +#include +#include + +#include +#include + + +namespace DB +{ + +void registerComplexMergeSelector(MergeSelectorFactory & factory) +{ + factory.registerPublicSelector("Complex", MergeSelectorAlgorithm::COMPLEX, [](const std::any & settings) + { + return std::make_shared(std::any_cast(settings)); + }); +} + +namespace +{ + +/** Estimates best set of parts to merge within passed alternatives. + */ +struct Estimator +{ + using Iterator = ComplexMergeSelector::PartsRange::const_iterator; + + void consider(Iterator begin, Iterator end, size_t sum_size, double sum_size_log_size, const ComplexMergeSelector::Settings &) + { + double current_score = score(sum_size, sum_size_log_size); + + if (max_score == 0.0 || current_score > max_score) + { + max_score = current_score; + best_begin = begin; + best_end = end; + } + } + + ComplexMergeSelector::PartsRange getBest() const + { + return ComplexMergeSelector::PartsRange(best_begin, best_end); + } + + static double score(double sum_size, double sum_size_log_size) + { + // TODO(serxa): check what is faster log or log2 + // TODO(serxa): add comment explaining why entropy maximization is the goal + return log2(sum_size) - sum_size_log_size / sum_size; + } + + double max_score = 0.0; + Iterator best_begin {}; + Iterator best_end {}; +}; + +void selectWithinPartition( + const ComplexMergeSelector::PartsRange & parts, + const size_t max_total_size_to_merge, + Estimator & estimator, + const ComplexMergeSelector::Settings & settings) +{ + size_t parts_count = parts.size(); + if (parts_count <= 1) + return; + + for (size_t begin = 0; begin < parts_count; ++begin) + { + if (!parts[begin].shall_participate_in_merges) + continue; + + // TODO(serxa): optimize out unnecessary log2() calls + size_t sum_size = parts[begin].size; + double sum_size_log_size = parts[begin].size * log2(parts[begin].size); + size_t max_size = parts[begin].size; + size_t min_age = parts[begin].age; + + for (size_t end = begin + 2; end <= parts_count; ++end) + { + assert(end > begin); + if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once) + break; + + if (!parts[end - 1].shall_participate_in_merges) + break; + + size_t cur_size = parts[end - 1].size; + size_t cur_age = parts[end - 1].age; + + sum_size += cur_size; + sum_size_log_size += cur_size * log2(cur_size); + max_size = std::max(max_size, cur_size); + min_age = std::min(min_age, cur_age); + + if (max_total_size_to_merge && sum_size > max_total_size_to_merge) + break; + + // TODO(serxa): add constraint to control write amplification + estimator.consider( + parts.begin() + begin, + parts.begin() + end, + sum_size, + sum_size_log_size, + settings); + } + } +} + +} + +ComplexMergeSelector::PartsRange ComplexMergeSelector::select( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge) +{ + Estimator estimator; + + for (const auto & part_range : parts_ranges) + selectWithinPartition(part_range, max_total_size_to_merge, estimator, settings); + + return estimator.getBest(); +} + +} diff --git a/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.h b/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.h new file mode 100644 index 00000000000..8ec45a754f6 --- /dev/null +++ b/src/Storages/MergeTree/MergeSelectors/ComplexMergeSelector.h @@ -0,0 +1,28 @@ +#pragma once + +#include + + +namespace DB +{ + +class ComplexMergeSelector final : public IMergeSelector +{ +public: + struct Settings + { + /// Zero means unlimited. Can be overridden by the same merge tree setting. + size_t max_parts_to_merge_at_once = 100; + }; + + explicit ComplexMergeSelector(const Settings & settings_) : settings(settings_) {} + + PartsRange select( + const PartsRanges & parts_ranges, + size_t max_total_size_to_merge) override; + +private: + const Settings settings; +}; + +}