few heuristics to disable optimisation

This commit is contained in:
Nikita Taranov 2023-01-18 14:22:32 +00:00
parent 3ce725023c
commit 47f57b7a5d
3 changed files with 68 additions and 11 deletions

View File

@ -1,10 +1,12 @@
#include <algorithm>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <numeric>
#include <queue>
#include <stdexcept>
#include <unordered_map>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -1336,21 +1338,37 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
"Independent aggregation by partitions won't be used because there are too many of them: {}. You can increase "
"max_number_of_partitions_for_independent_aggregation (current value is {}) or set "
"force_aggregate_partitions_independently to suppress this check",
settings.max_number_of_partitions_for_independent_aggregation,
partitions_cnt);
partitions_cnt,
settings.max_number_of_partitions_for_independent_aggregation);
return false;
}
/// todo: check how big is the difference in the number of rows between parts and disable optimization if it is > 2.
/*if (!settings.force_aggregate_partitions_independently)
if (!settings.force_aggregate_partitions_independently)
{
LOG_TRACE(
log,
"Independent aggregation by partitions won't be used because there are too big skew in the number of rows between partitions: "
"{} {} ."
"You can set force_aggregate_partitions_independently to suppress this check");
return false;
}*/
std::unordered_map<String, size_t> partition_rows;
for (const auto & part : prepared_parts)
partition_rows[part->info.partition_id] += part->rows_count;
size_t sum_rows = 0;
size_t max_rows = 0;
for (const auto & [_, rows] : partition_rows)
{
sum_rows += rows;
max_rows = std::max(max_rows, rows);
}
/// Merging shouldn't take more time than preaggregation in normal cases. And exec time is proportional to the amount of data.
/// We assume that exec time of independent aggr is proportional to the maximum of sizes and
/// exec time of ordinary aggr is proportional to sum of sizes divided by number of threads and multiplied by two (preaggregation + merging).
const size_t avg_rows_in_partition = sum_rows / settings.max_threads;
if (max_rows > avg_rows_in_partition * 2)
{
LOG_TRACE(
log,
"Independent aggregation by partitions won't be used because there are too big skew in the number of rows between "
"partitions. You can set force_aggregate_partitions_independently to suppress this check");
return false;
}
}
return output_each_partition_through_separate_port = true;
}

View File

@ -203,3 +203,6 @@ Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 1
Skip merging: 0
Skip merging: 0
Skip merging: 0

View File

@ -170,3 +170,39 @@ select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
) where explain like '%Skip merging: %';
drop table t14;
-- to few partitions --
create table t15(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90;
insert into t15 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t15 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0;
drop table t15;
-- to many partitions --
create table t16(a UInt32, b UInt32) engine=MergeTree order by a partition by a % 16;
insert into t16 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t16 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0, max_number_of_partitions_for_independent_aggregation = 4;
drop table t16;
-- to big skew --
create table t17(a UInt32, b UInt32) engine=MergeTree order by a partition by a < 90;
insert into t17 select number, number from numbers_mt(100);
select replaceRegexpOne(explain, '^[ ]*(.*)', '\\1') from (
explain actions=1 select a from t17 group by a
) where explain like '%Skip merging: %'
settings force_aggregate_partitions_independently = 0, max_threads = 4;
drop table t17;