diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index f8b41104130..107d2d5517e 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1,10 +1,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -1336,21 +1338,37 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort() "Independent aggregation by partitions won't be used because there are too many of them: {}. You can increase " "max_number_of_partitions_for_independent_aggregation (current value is {}) or set " "force_aggregate_partitions_independently to suppress this check", - settings.max_number_of_partitions_for_independent_aggregation, - partitions_cnt); + partitions_cnt, + settings.max_number_of_partitions_for_independent_aggregation); return false; } - /// todo: check how big is the difference in the number of rows between parts and disable optimization if it is > 2. - /*if (!settings.force_aggregate_partitions_independently) + if (!settings.force_aggregate_partitions_independently) { - LOG_TRACE( - log, - "Independent aggregation by partitions won't be used because there are too big skew in the number of rows between partitions: " - "{} {} ." - "You can set force_aggregate_partitions_independently to suppress this check"); - return false; - }*/ + std::unordered_map partition_rows; + for (const auto & part : prepared_parts) + partition_rows[part->info.partition_id] += part->rows_count; + size_t sum_rows = 0; + size_t max_rows = 0; + for (const auto & [_, rows] : partition_rows) + { + sum_rows += rows; + max_rows = std::max(max_rows, rows); + } + + /// Merging shouldn't take more time than preaggregation in normal cases. And exec time is proportional to the amount of data. + /// We assume that exec time of independent aggr is proportional to the maximum of sizes and + /// exec time of ordinary aggr is proportional to sum of sizes divided by number of threads and multiplied by two (preaggregation + merging). + const size_t avg_rows_in_partition = sum_rows / settings.max_threads; + if (max_rows > avg_rows_in_partition * 2) + { + LOG_TRACE( + log, + "Independent aggregation by partitions won't be used because there are too big skew in the number of rows between " + "partitions. You can set force_aggregate_partitions_independently to suppress this check"); + return false; + } + } return output_each_partition_through_separate_port = true; } diff --git a/tests/queries/0_stateless/02521_aggregation_by_partitions.reference b/tests/queries/0_stateless/02521_aggregation_by_partitions.reference index 121ebd52e5a..4a0967fb8ba 100644 --- a/tests/queries/0_stateless/02521_aggregation_by_partitions.reference +++ b/tests/queries/0_stateless/02521_aggregation_by_partitions.reference @@ -203,3 +203,6 @@ Skip merging: 0 Skip merging: 1 Skip merging: 0 Skip merging: 1 +Skip merging: 0 +Skip merging: 0 +Skip merging: 0 diff --git a/tests/queries/0_stateless/02521_aggregation_by_partitions.sql b/tests/queries/0_stateless/02521_aggregation_by_partitions.sql index 919ac374024..1c139e94591 100644 --- a/tests/queries/0_stateless/02521_aggregation_by_partitions.sql +++ b/tests/queries/0_stateless/02521_aggregation_by_partitions.sql @@ -170,3 +170,39 @@ select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from ( ) where explain like '%Skip merging: %'; drop table t14; + +-- to few partitions -- +create table t15(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90; + +insert into t15 select number, number from numbers_mt(100); + +select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from ( + explain actions=1 select a from t15 group by a +) where explain like '%Skip merging: %' +settings force_aggregate_partitions_independently = 0; + +drop table t15; + +-- to many partitions -- +create table t16(a UInt32, b UInt32) engine=MergeTree order by a partition by a % 16; + +insert into t16 select number, number from numbers_mt(100); + +select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from ( + explain actions=1 select a from t16 group by a +) where explain like '%Skip merging: %' +settings force_aggregate_partitions_independently = 0, max_number_of_partitions_for_independent_aggregation = 4; + +drop table t16; + +-- to big skew -- +create table t17(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90; + +insert into t17 select number, number from numbers_mt(100); + +select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from ( + explain actions=1 select a from t17 group by a +) where explain like '%Skip merging: %' +settings force_aggregate_partitions_independently = 0, max_threads = 4; + +drop table t17;