From f6f4285348451a79203d9cb6b07514e5347cd3c7 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 2 Oct 2020 20:07:54 +0300 Subject: [PATCH] Improve quantileTDigest performance --- src/AggregateFunctions/QuantileTDigest.h | 157 ++++++++++++++++------- 1 file changed, 114 insertions(+), 43 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index b90979c02b9..fc957adb739 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -36,7 +36,7 @@ namespace ErrorCodes * uses asin, which slows down the algorithm a bit. */ template -class QuantileTDigest +class TDigest { using Value = Float32; using Count = Float32; @@ -86,20 +86,12 @@ class QuantileTDigest /// The memory will be allocated to several elements at once, so that the state occupies 64 bytes. static constexpr size_t bytes_in_arena = 128 - sizeof(PODArray) - sizeof(Count) - sizeof(UInt32); - using Summary = PODArrayWithStackMemory; + using Centroids = PODArrayWithStackMemory; - Summary summary; + Centroids centroids; Count count = 0; UInt32 unmerged = 0; - /** Linear interpolation at the point x on the line (x1, y1)..(x2, y2) - */ - static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) - { - double k = (x - x1) / (x2 - x1); - return y1 + k * (y2 - y1); - } - struct RadixSortTraits { using Element = Centroid; @@ -122,13 +114,14 @@ class QuantileTDigest */ void addCentroid(const Centroid & c) { - summary.push_back(c); + centroids.push_back(c); count += c.count; ++unmerged; if (unmerged >= params.max_unmerged) compress(); } +public: /** Performs compression of accumulated centroids * When merging, the invariant is retained to the maximum size of each * centroid that does not exceed `4 q (1 - q) \ delta N`. @@ -137,16 +130,16 @@ class QuantileTDigest { if (unmerged > 0) { - RadixSort::executeLSD(summary.data(), summary.size()); + RadixSort::executeLSD(centroids.data(), centroids.size()); - if (summary.size() > 3) + if (centroids.size() > 3) { /// A pair of consecutive bars of the histogram. - auto l = summary.begin(); + auto l = centroids.begin(); auto r = std::next(l); Count sum = 0; - while (r != summary.end()) + while (r != centroids.end()) { // we use quantile which gives us the smallest error @@ -188,14 +181,13 @@ class QuantileTDigest } /// At the end of the loop, all values to the right of l were "eaten". - summary.resize(l - summary.begin() + 1); + centroids.resize(l - centroids.begin() + 1); } unmerged = 0; } } -public: /** Adds to the digest a change in `x` with a weight of `cnt` (default 1) */ void add(T x, UInt64 cnt = 1) @@ -203,17 +195,17 @@ public: addCentroid(Centroid(Value(x), Count(cnt))); } - void merge(const QuantileTDigest & other) + void merge(const TDigest & other) { - for (const auto & c : other.summary) + for (const auto & c : other.centroids) addCentroid(c); } void serialize(WriteBuffer & buf) { compress(); - writeVarUInt(summary.size(), buf); - buf.write(reinterpret_cast(summary.data()), summary.size() * sizeof(summary[0])); + writeVarUInt(centroids.size(), buf); + buf.write(reinterpret_cast(centroids.data()), centroids.size() * sizeof(centroids[0])); } void deserialize(ReadBuffer & buf) @@ -222,36 +214,112 @@ public: readVarUInt(size, buf); if (size > params.max_unmerged) - throw Exception("Too large t-digest summary size", ErrorCodes::TOO_LARGE_ARRAY_SIZE); + throw Exception("Too large t-digest centroids size", ErrorCodes::TOO_LARGE_ARRAY_SIZE); - summary.resize(size); - buf.read(reinterpret_cast(summary.data()), size * sizeof(summary[0])); + centroids.resize(size); + buf.read(reinterpret_cast(centroids.data()), size * sizeof(centroids[0])); count = 0; - for (const auto & c : summary) + for (const auto & c : centroids) count += c.count; } + Count getCount() + { + return count; + } + + const Centroids & getCentroids() const + { + return centroids; + } + + void reset() + { + centroids.resize(0); + count = 0; + unmerged = 0; + } +}; + +template +class QuantileTDigest { + using Value = Float32; + using Count = Float32; + + /** We store two t-digests. When an amount of elements in sub_tdigest become more than merge_threshold + * we merge sub_tdigest in main_tdigest and reset sub_tdigest. This method is needed to decrease an amount of + * centroids in t-digest (experiments show that after merge_threshold the size of t-digest significantly grows, + * but merging two big t-digest decreases it). + */ + TDigest main_tdigest; + TDigest sub_tdigest; + size_t merge_threshold = 1e7; + + /** Linear interpolation at the point x on the line (x1, y1)..(x2, y2) + */ + static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) + { + double k = (x - x1) / (x2 - x1); + return y1 + k * (y2 - y1); + } + + void mergeTDigests() + { + main_tdigest.merge(sub_tdigest); + sub_tdigest.reset(); + } + +public: + void add(T x, UInt64 cnt = 1) + { + if (sub_tdigest.getCount() >= merge_threshold) + mergeTDigests(); + sub_tdigest.add(x, cnt); + } + + void merge(const QuantileTDigest & other) + { + mergeTDigests(); + main_tdigest.merge(other.main_tdigest); + main_tdigest.merge(other.sub_tdigest); + } + + void serialize(WriteBuffer & buf) + { + mergeTDigests(); + main_tdigest.serialize(buf); + } + + void deserialize(ReadBuffer & buf) + { + sub_tdigest.reset(); + main_tdigest.deserialize(buf); + } + /** Calculates the quantile q [0, 1] based on the digest. * For an empty digest returns NaN. */ template ResultType getImpl(Float64 level) { - if (summary.empty()) + mergeTDigests(); + + auto & centroids = main_tdigest.getCentroids(); + if (centroids.empty()) return std::is_floating_point_v ? NAN : 0; - compress(); + main_tdigest.compress(); - if (summary.size() == 1) - return summary.front().mean; + if (centroids.size() == 1) + return centroids.front().mean; - Float64 x = level * count; + Float64 x = level * main_tdigest.getCount(); Float64 prev_x = 0; Count sum = 0; - Value prev_mean = summary.front().mean; + Value prev_mean = centroids.front().mean; - for (const auto & c : summary) + for (const auto & c : centroids) { Float64 current_x = sum + c.count * 0.5; @@ -263,7 +331,7 @@ public: prev_x = current_x; } - return summary.back().mean; + return centroids.back().mean; } /** Get multiple quantiles (`size` parts). @@ -274,29 +342,32 @@ public: template void getManyImpl(const Float64 * levels, const size_t * levels_permutation, size_t size, ResultType * result) { - if (summary.empty()) + mergeTDigests(); + + auto & centroids = main_tdigest.getCentroids(); + if (centroids.empty()) { for (size_t result_num = 0; result_num < size; ++result_num) result[result_num] = std::is_floating_point_v ? NAN : 0; return; } - compress(); + main_tdigest.compress(); - if (summary.size() == 1) + if (centroids.size() == 1) { for (size_t result_num = 0; result_num < size; ++result_num) - result[result_num] = summary.front().mean; + result[result_num] = centroids.front().mean; return; } - Float64 x = levels[levels_permutation[0]] * count; + Float64 x = levels[levels_permutation[0]] * main_tdigest.getCount(); Float64 prev_x = 0; Count sum = 0; - Value prev_mean = summary.front().mean; + Value prev_mean = centroids.front().mean; size_t result_num = 0; - for (const auto & c : summary) + for (const auto & c : centroids) { Float64 current_x = sum + c.count * 0.5; @@ -308,7 +379,7 @@ public: if (result_num >= size) return; - x = levels[levels_permutation[result_num]] * count; + x = levels[levels_permutation[result_num]] * main_tdigest.getCount(); } sum += c.count; @@ -316,7 +387,7 @@ public: prev_x = current_x; } - auto rest_of_results = summary.back().mean; + auto rest_of_results = centroids.back().mean; for (; result_num < size; ++result_num) result[levels_permutation[result_num]] = rest_of_results; }