From 1ec330b80e9635b8533b4bdbf15892840c792928 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Sun, 8 May 2022 20:31:03 +0300 Subject: [PATCH 1/6] Fixed problem with infs in `quantileTDigest`. --- src/AggregateFunctions/QuantileTDigest.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index adb02171f72..d0f43051343 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -263,7 +263,7 @@ public: void add(T x, UInt64 cnt = 1) { auto vx = static_cast(x); - if (cnt == 0 || std::isnan(vx)) + if (cnt == 0 || std::isnan(vx) || std::isinf(vx)) return; // Count 0 breaks compress() assumptions, Nan breaks sort(). We treat them as no sample. addCentroid(Centroid{vx, static_cast(cnt)}); } @@ -271,7 +271,8 @@ public: void merge(const QuantileTDigest & other) { for (const auto & c : other.centroids) - addCentroid(c); + if (!std::isnan(c.mean) && !std::isinf(c.mean)) + addCentroid(c); } void serialize(WriteBuffer & buf) @@ -298,7 +299,7 @@ public: for (const auto & c : centroids) { - if (c.count <= 0 || std::isnan(c.count) || std::isnan(c.mean)) // invalid count breaks compress(), invalid mean breaks sort() + if (c.count <= 0 || std::isnan(c.count) || std::isnan(c.mean) || std::isinf(c.mean)) // invalid count breaks compress(), invalid mean breaks sort() throw Exception("Invalid centroid " + std::to_string(c.count) + ":" + std::to_string(c.mean), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); count += c.count; } From 825fc0f30a4621810c75440809b5b8486bc419f4 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Wed, 11 May 2022 20:13:36 +0300 Subject: [PATCH 2/6] Fixed TDigest even better. --- src/AggregateFunctions/QuantileTDigest.h | 28 +++++++++++++++++------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index d0f43051343..eda3e52d9db 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -103,8 +103,15 @@ class QuantileTDigest */ static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) { - double k = (x - x1) / (x2 - x1); - return y1 + k * (y2 - y1); + if (y1 != y2) + { + double k = (x - x1) / (x2 - x1); + return y1 + k * (y2 - y1); + } + else + { + return y1; + } } struct RadixSortTraits @@ -155,7 +162,10 @@ class QuantileTDigest { /// The left column "eats" the right. Middle of the batch l_count += r->count; - l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower + if (r->mean != l_mean) + { + l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower + } l->mean = l_mean; l->count = l_count; batch_pos += 1; @@ -230,7 +240,10 @@ public: // it is possible to merge left and right /// The left column "eats" the right. l_count += r->count; - l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower + if (r->mean != l_mean) + { + l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower + } l->mean = l_mean; l->count = l_count; } @@ -263,7 +276,7 @@ public: void add(T x, UInt64 cnt = 1) { auto vx = static_cast(x); - if (cnt == 0 || std::isnan(vx) || std::isinf(vx)) + if (cnt == 0 || std::isnan(vx)) return; // Count 0 breaks compress() assumptions, Nan breaks sort(). We treat them as no sample. addCentroid(Centroid{vx, static_cast(cnt)}); } @@ -271,8 +284,7 @@ public: void merge(const QuantileTDigest & other) { for (const auto & c : other.centroids) - if (!std::isnan(c.mean) && !std::isinf(c.mean)) - addCentroid(c); + addCentroid(c); } void serialize(WriteBuffer & buf) @@ -299,7 +311,7 @@ public: for (const auto & c : centroids) { - if (c.count <= 0 || std::isnan(c.count) || std::isnan(c.mean) || std::isinf(c.mean)) // invalid count breaks compress(), invalid mean breaks sort() + if (c.count <= 0 || std::isnan(c.count) || std::isnan(c.mean)) // invalid count breaks compress(), invalid mean breaks sort() throw Exception("Invalid centroid " + std::to_string(c.count) + ":" + std::to_string(c.mean), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); count += c.count; } From 009dd618a661274f68f4ca0ea0183cdc6f483b95 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Thu, 12 May 2022 01:49:08 +0300 Subject: [PATCH 3/6] Draft. --- src/AggregateFunctions/QuantileTDigest.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index eda3e52d9db..7121603af42 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -103,7 +103,7 @@ class QuantileTDigest */ static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) { - if (y1 != y2) + if (y1 != y2) /// Handling infinities of the same sign well. { double k = (x - x1) / (x2 - x1); return y1 + k * (y2 - y1); @@ -162,7 +162,7 @@ class QuantileTDigest { /// The left column "eats" the right. Middle of the batch l_count += r->count; - if (r->mean != l_mean) + if (r->mean != l_mean) /// Handling infinities of the same sign well. { l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower } @@ -240,7 +240,7 @@ public: // it is possible to merge left and right /// The left column "eats" the right. l_count += r->count; - if (r->mean != l_mean) + if (r->mean != l_mean) /// Handling infinities of the same sign well. { l_mean += r->count * (r->mean - l_mean) / l_count; // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower } From d519e83a4cd66faa1c9984a4e6be2682d48c1fb8 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Fri, 13 May 2022 22:25:57 +0300 Subject: [PATCH 4/6] Proper work with infinities. --- src/AggregateFunctions/QuantileTDigest.h | 40 +++++++++++++++++------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index 7121603af42..16fa3eb3025 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -144,6 +144,11 @@ class QuantileTDigest compress(); } + inline bool canBeMerged(const BetterFloat & l_mean, const Value & r_mean) + { + return l_mean == r_mean || (!std::isinf(l_mean) && !std::isinf(r_mean)); + } + void compressBrute() { if (centroids.size() <= params.max_centroids) @@ -156,7 +161,8 @@ class QuantileTDigest BetterFloat l_mean = l->mean; // We have high-precision temporaries for numeric stability BetterFloat l_count = l->count; size_t batch_pos = 0; - for (;r != centroids.end(); ++r) + + for (; r != centroids.end(); ++r) { if (batch_pos < batch_size - 1) { @@ -173,8 +179,11 @@ class QuantileTDigest else { // End of the batch, start the next one - sum += l->count; // Not l_count, otherwise actual sum of elements will be different - ++l; + if (!std::isnan(l->mean)) /// Skip writing batch result if we compressed something to nan. + { + sum += l->count; // Not l_count, otherwise actual sum of elements will be different + ++l; + } /// We skip all the values "eaten" earlier. *l = *r; @@ -183,8 +192,17 @@ class QuantileTDigest batch_pos = 0; } } - count = sum + l_count; // Update count, it might be different due to += inaccuracy - centroids.resize(l - centroids.begin() + 1); + + if (!std::isnan(l->mean)) + { + count = sum + l_count; // Update count, it might be different due to += inaccuracy + centroids.resize(l - centroids.begin() + 1); + } + else /// Skip writing last batch if (super unlikely) it's nan. + { + count = sum; + centroids.resize(l - centroids.begin()); + } // Here centroids.size() <= params.max_centroids } @@ -210,11 +228,8 @@ public: BetterFloat l_count = l->count; while (r != centroids.end()) { - /// N.B. Piece of logic which compresses the same singleton centroids into one centroid is removed - /// because: 1) singleton centroids are being processed in unusual way in recent version of algorithm - /// and such compression would break this logic; - /// 2) we shall not compress centroids further than `max_centroids` parameter requires because - /// this will lead to uneven compression. + /// N.B. We cannot merge all the same values into single centroids because this will lead to + /// unbalanced compression and wrong results. /// For more information see: https://arxiv.org/abs/1902.04023 /// The ratio of the part of the histogram to l, including the half l to the entire histogram. That is, what level quantile in position l. @@ -235,7 +250,7 @@ public: * and at the edges decreases and is approximately equal to the distance to the edge * 4. */ - if (l_count + r->count <= k) + if (l_count + r->count <= k && canBeMerged(l_mean, r->mean)) { // it is possible to merge left and right /// The left column "eats" the right. @@ -267,6 +282,7 @@ public: centroids.resize(l - centroids.begin() + 1); unmerged = 0; } + // Ensures centroids.size() < max_centroids, independent of unprovable floating point blackbox above compressBrute(); } @@ -325,7 +341,7 @@ public: ResultType getImpl(Float64 level) { if (centroids.empty()) - return std::is_floating_point_v ? NAN : 0; + return std::is_floating_point_v ? std::numeric_limits::quiet_NaN() : 0; compress(); From a114ab7223dba131e06ab807f4e51cb3943333f2 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Fri, 13 May 2022 22:38:35 +0300 Subject: [PATCH 5/6] Fixed the bug and supported old states. --- src/AggregateFunctions/QuantileTDigest.h | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index 16fa3eb3025..c080c6f9749 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -103,15 +103,9 @@ class QuantileTDigest */ static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) { - if (y1 != y2) /// Handling infinities of the same sign well. - { - double k = (x - x1) / (x2 - x1); - return y1 + k * (y2 - y1); - } - else - { - return y1; - } + /// Symmetric interpolation for better results with infinities. + double k = (x - x1) / (x2 - x1); + return (1 - k) * y1 + k * y2; } struct RadixSortTraits @@ -327,10 +321,17 @@ public: for (const auto & c : centroids) { - if (c.count <= 0 || std::isnan(c.count) || std::isnan(c.mean)) // invalid count breaks compress(), invalid mean breaks sort() + if (c.count <= 0 || std::isnan(c.count)) // invalid count breaks compress() throw Exception("Invalid centroid " + std::to_string(c.count) + ":" + std::to_string(c.mean), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); - count += c.count; + if (!std::isnan(c.mean)) + { + count += c.count; + } } + + auto it = std::remove_if(centroids.begin(), centroids.end(), [](Centroid & c) { return std::isnan(c.mean); }); + centroids.erase(it, centroids.end()); + compress(); // Allows reading/writing TDigests with different epsilon/max_centroids params } From 5bbab401c4a38bf87607a9b04ca64e0b44cf17f9 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Sun, 15 May 2022 22:49:52 +0300 Subject: [PATCH 6/6] Added test. --- src/AggregateFunctions/QuantileTDigest.h | 1 - .../02286_quantile_tdigest_infinity.reference | 42 +++++++++++++++ .../02286_quantile_tdigest_infinity.sql | 54 +++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/02286_quantile_tdigest_infinity.reference create mode 100644 tests/queries/0_stateless/02286_quantile_tdigest_infinity.sql diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index c080c6f9749..b5f32bad247 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -425,7 +425,6 @@ public: while (current_x >= x) { - if (x <= left) result[levels_permutation[result_num]] = prev_mean; else if (x >= right) diff --git a/tests/queries/0_stateless/02286_quantile_tdigest_infinity.reference b/tests/queries/0_stateless/02286_quantile_tdigest_infinity.reference new file mode 100644 index 00000000000..fd6451177f5 --- /dev/null +++ b/tests/queries/0_stateless/02286_quantile_tdigest_infinity.reference @@ -0,0 +1,42 @@ +1 +[-inf,-inf,-inf,nan,inf,inf,inf] +[-inf,-inf,-inf,nan,inf,inf,inf] +[0,0,0,inf,inf,inf,inf] +[-inf,-inf,-inf,-inf,0,0,0] +[-inf,-inf,-inf,0,inf,inf,inf] +[-inf,-inf,-inf,0,inf,inf,inf] +2 +[-inf] +[-inf] +[inf] +3 +[nan] +[inf] +[nan] +[-inf] +4 +[nan] +[nan] +[nan] +[nan] +[0] +[0] +[0] +5 +6 +inf +inf +-inf +-inf +7 +-inf +-inf +8 +-inf +inf +-inf +-inf +inf +inf +-inf +inf diff --git a/tests/queries/0_stateless/02286_quantile_tdigest_infinity.sql b/tests/queries/0_stateless/02286_quantile_tdigest_infinity.sql new file mode 100644 index 00000000000..d21f7352674 --- /dev/null +++ b/tests/queries/0_stateless/02286_quantile_tdigest_infinity.sql @@ -0,0 +1,54 @@ +SELECT '1'; +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([inf], 500000, -inf), 1000000, inf)); +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([inf], 500000, inf), 1000000, -inf)); +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([inf], 500000, inf), 1000000, 0)); +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([inf], 500000, -inf), 1000000, 0)); +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([0], 500000, inf), 1000000, -inf)); +SELECT quantilesTDigestArray(0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99)(arrayResize(arrayResize([0], 500000, -inf), 1000000, inf)); + +SELECT '2'; +SELECT quantilesTDigest(0.05)(x) FROM (SELECT inf*(number%2-0.5) x FROM numbers(300)); +SELECT quantilesTDigest(0.5)(x) FROM (SELECT inf*(number%2-0.5) x FROM numbers(300)); +SELECT quantilesTDigest(0.95)(x) FROM (SELECT inf*(number%2-0.5) x FROM numbers(300)); + +SELECT '3'; +SELECT quantiles(0.5)(inf) FROM numbers(5); +SELECT quantiles(0.5)(inf) FROM numbers(300); +SELECT quantiles(0.5)(-inf) FROM numbers(5); +SELECT quantiles(0.5)(-inf) FROM numbers(300); + +SELECT '4'; +SELECT quantiles(0.5)(arrayJoin([inf, 0, -inf])); +SELECT quantiles(0.5)(arrayJoin([-inf, 0, inf])); +SELECT quantiles(0.5)(arrayJoin([inf, -inf, 0])); +SELECT quantiles(0.5)(arrayJoin([-inf, inf, 0])); +SELECT quantiles(0.5)(arrayJoin([inf, inf, 0, -inf, -inf, -0])); +SELECT quantiles(0.5)(arrayJoin([inf, -inf, 0, -inf, inf, -0])); +SELECT quantiles(0.5)(arrayJoin([-inf, -inf, 0, inf, inf, -0])); + +SELECT '5'; +DROP TABLE IF EXISTS issue32107; +CREATE TABLE issue32107(A Int64, s_quantiles AggregateFunction(quantilesTDigest(0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99), Float64)) ENGINE = AggregatingMergeTree ORDER BY A; +INSERT INTO issue32107 SELECT A, quantilesTDigestState(0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99)(x) FROM (SELECT 1 A, arrayJoin(cast([2.0, inf, number / 33333],'Array(Float64)')) x FROM numbers(100)) GROUP BY A; +OPTIMIZE TABLE issue32107 FINAL; +DROP TABLE IF EXISTS issue32107; + +SELECT '6'; +SELECT quantileTDigest(inf) FROM numbers(200); +SELECT quantileTDigest(inf) FROM numbers(500); +SELECT quantileTDigest(-inf) FROM numbers(200); +SELECT quantileTDigest(-inf) FROM numbers(500); + +SELECT '7'; +SELECT quantileTDigest(x) FROM (SELECT inf AS x UNION ALL SELECT -inf); +SELECT quantileTDigest(x) FROM (SELECT -inf AS x UNION ALL SELECT inf); + +SELECT '8'; +SELECT quantileTDigest(x) FROM (SELECT inf AS x UNION ALL SELECT -inf UNION ALL SELECT -inf); +SELECT quantileTDigest(x) FROM (SELECT inf AS x UNION ALL SELECT inf UNION ALL SELECT -inf); +SELECT quantileTDigest(x) FROM (SELECT -inf AS x UNION ALL SELECT -inf UNION ALL SELECT -inf); +SELECT quantileTDigest(x) FROM (SELECT -inf AS x UNION ALL SELECT inf UNION ALL SELECT -inf); +SELECT quantileTDigest(x) FROM (SELECT inf AS x UNION ALL SELECT -inf UNION ALL SELECT inf); +SELECT quantileTDigest(x) FROM (SELECT inf AS x UNION ALL SELECT inf UNION ALL SELECT inf); +SELECT quantileTDigest(x) FROM (SELECT -inf AS x UNION ALL SELECT -inf UNION ALL SELECT inf); +SELECT quantileTDigest(x) FROM (SELECT -inf AS x UNION ALL SELECT inf UNION ALL SELECT inf);