From ddba07b9c32fef84e764972c241991e7dec72745 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 15 Feb 2023 21:19:50 +0800 Subject: [PATCH] improve performance --- src/AggregateFunctions/QuantileGK.h | 77 ++++++++++++++++------------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/src/AggregateFunctions/QuantileGK.h b/src/AggregateFunctions/QuantileGK.h index 6dba8ac5723..3616a276559 100644 --- a/src/AggregateFunctions/QuantileGK.h +++ b/src/AggregateFunctions/QuantileGK.h @@ -1,7 +1,8 @@ #pragma once - #include +#include +#include #include #include #include @@ -53,6 +54,8 @@ public: , compressed(compressed_) , sampled(sampled_) { + sampled.reserve(compress_threshold); + backup_sampled.reserve(compress_threshold); } bool isCompressed() const { return compressed; } @@ -113,9 +116,8 @@ public: void compress() { withHeadBufferInserted(); - auto compressed_samples = doCompress(sampled, 2 * relative_error * count); - std::swap(sampled, compressed_samples); + doCompress(2 * relative_error * count); compressed = true; } @@ -165,8 +167,8 @@ public: // `max(g_ab + delta_ab) <= floor(2 * eps_ab * (n_a + n_b))` since // `max(g_ab + delta_ab) <= floor(2 * eps_a * n_a) + floor(2 * eps_b * n_b)` // Finally, one can see how the `insert(x)` operation can be expressed as `merge([(x, 1, 0])` - std::vector merged_sampled; - merged_sampled.reserve(sampled.size() + other.sampled.size()); + backup_sampled.clear(); + backup_sampled.reserve(sampled.size() + other.sampled.size()); double merged_relative_error = std::max(relative_error, other.relative_error); size_t merged_count = count + other.count; Int64 additional_self_delta = static_cast(std::floor(2 * other.relative_error * other.count)); @@ -198,27 +200,28 @@ public: // Insert it next_sample.delta += additional_delta; - merged_sampled.push_back(next_sample); + backup_sampled.emplace_back(std::move(next_sample)); } // Copy the remaining samples from the other list // (by construction, at most one `while` loop will run) while (self_idx < sampled.size()) { - merged_sampled.push_back(sampled[self_idx]); + backup_sampled.emplace_back(sampled[self_idx]); ++self_idx; } while (other_idx < other.sampled.size()) { - merged_sampled.emplace_back(other.sampled[other_idx]); + backup_sampled.emplace_back(other.sampled[other_idx]); ++other_idx; } - auto compressed_sampled = doCompress(merged_sampled, 2 * merged_relative_error * merged_count); - compress_threshold = other.compress_threshold; + std::swap(sampled, backup_sampled); relative_error = merged_relative_error; - std::swap(compressed_sampled, sampled); count = merged_count; + compress_threshold = other.compress_threshold; + + doCompress(2 * merged_relative_error * merged_count); compressed = true; } } @@ -287,13 +290,18 @@ private: if (head_sampled.empty()) return; - size_t current_count = count; - std::sort(head_sampled.begin(), head_sampled.end()); - std::vector new_samples; - new_samples.reserve(sampled.size() + head_sampled.size()); + bool use_radix_sort = head_sampled.size() >= 256 && (is_arithmetic_v && !is_big_int_v); + if (use_radix_sort) + RadixSort>::executeLSD(head_sampled.data(), head_sampled.size()); + else + ::sort(head_sampled.begin(), head_sampled.end()); + + backup_sampled.clear(); + backup_sampled.reserve(sampled.size() + head_sampled.size()); size_t sample_idx = 0; size_t ops_idx = 0; + size_t current_count = count; for (; ops_idx < head_sampled.size(); ++ops_idx) { T current_sample = head_sampled[ops_idx]; @@ -301,47 +309,47 @@ private: // Add all the samples before the next observation. while (sample_idx < sampled.size() && sampled[sample_idx].value <= current_sample) { - new_samples.push_back(sampled[sample_idx]); + backup_sampled.emplace_back(sampled[sample_idx]); ++sample_idx; } // If it is the first one to insert, of if it is the last one ++current_count; Int64 delta; - if (new_samples.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1))) + if (backup_sampled.empty() || (sample_idx == sampled.size() && ops_idx == (head_sampled.size() - 1))) delta = 0; else delta = static_cast(std::floor(2 * relative_error * current_count)); - new_samples.emplace_back(current_sample, 1, delta); + backup_sampled.emplace_back(current_sample, 1, delta); } // Add all the remaining existing samples for (; sample_idx < sampled.size(); ++sample_idx) - new_samples.push_back(sampled[sample_idx]); + backup_sampled.emplace_back(sampled[sample_idx]); - std::swap(sampled, new_samples); + std::swap(sampled, backup_sampled); head_sampled.clear(); count = current_count; } - static std::vector doCompress(const std::vector & current_samples, double merge_threshold) + void doCompress(double merge_threshold) { - if (current_samples.empty()) - return {}; + if (sampled.empty()) + return; - std::vector res; + backup_sampled.clear(); // Start for the last element, which is always part of the set. // The head contains the current new head, that may be merged with the current element. - auto head = current_samples.back(); - ssize_t i = current_samples.size() - 2; + Stats head = sampled.back(); + ssize_t i = sampled.size() - 2; // Do not compress the last element while (i >= 1) { // The current sample: - const auto & sample1 = current_samples[i]; + const auto & sample1 = sampled[i]; // Do we need to compress? if (sample1.g + head.g + head.delta < merge_threshold) { @@ -351,22 +359,23 @@ private: else { // Prepend the current head, and keep the current sample as target for merging. - res.insert(res.begin(), head); + backup_sampled.push_back(head); head = sample1; } --i; } - res.insert(res.begin(), head); + backup_sampled.push_back(head); // If necessary, add the minimum element: - auto curr_head = current_samples.front(); + auto curr_head = sampled.front(); // don't add the minimum element if `currentSamples` has only one element (both `currHead` and // `head` point to the same element) - if (curr_head.value <= head.value && current_samples.size() > 1) - res.insert(res.begin(), current_samples.front()); + if (curr_head.value <= head.value && sampled.size() > 1) + backup_sampled.emplace_back(sampled.front()); - return res; + std::reverse(backup_sampled.begin(), backup_sampled.end()); + std::swap(sampled, backup_sampled); } double relative_error; @@ -375,6 +384,8 @@ private: bool compressed; std::vector sampled; + std::vector backup_sampled; + std::vector head_sampled; static constexpr size_t default_compress_threshold = 10000;