From fa200160915ee9c187e5e64a4a1e395d70430b7f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 17 Feb 2021 09:53:18 +0300 Subject: [PATCH 1/3] Enable distributed_aggregation_memory_efficient by default --- src/Core/Settings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9bb9ad30f15..6c05d247037 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -100,7 +100,7 @@ class IColumn; M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \ M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \ M(UInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \ - M(Bool, distributed_aggregation_memory_efficient, false, "Is the memory-saving mode of distributed aggregation enabled.", 0) \ + M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \ M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \ \ M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \ From 0ab14120ef311ec7ff614b08a25268fb078cc7e5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 21 Feb 2021 23:06:31 +0300 Subject: [PATCH 2/3] Improve performance of trivial count query in presense of "distributed_aggregation_memory_efficient" --- src/Interpreters/InterpreterSelectQuery.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 3008c55973d..da6ad7ab102 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1285,8 +1285,11 @@ void InterpreterSelectQuery::executeFetchColumns( const auto & desc = query_analyzer->aggregates()[0]; const auto & func = desc.function; std::optional num_rows{}; + if (!query.prewhere() && !query.where()) + { num_rows = storage->totalRows(settings); + } else // It's possible to optimize count() given only partition predicates { SelectQueryInfo temp_query_info; @@ -1296,6 +1299,7 @@ void InterpreterSelectQuery::executeFetchColumns( num_rows = storage->totalRowsByPartitionPredicate(temp_query_info, *context); } + if (num_rows) { AggregateFunctionCount & agg_count = static_cast(*func); @@ -1790,7 +1794,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), std::move(transform_params), - settings.distributed_aggregation_memory_efficient, + settings.distributed_aggregation_memory_efficient && storage && storage->isRemote(), settings.max_threads, settings.aggregation_memory_efficient_merge_threads); From d7f017c4ddfabaf0e0ba972491ba1495a17e445c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 21 Feb 2021 23:06:31 +0300 Subject: [PATCH 3/3] Improve performance of trivial count query in presense of "distributed_aggregation_memory_efficient" --- src/Interpreters/InterpreterSelectQuery.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9f97160f77f..370e7224542 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1269,8 +1269,11 @@ void InterpreterSelectQuery::executeFetchColumns( const auto & desc = query_analyzer->aggregates()[0]; const auto & func = desc.function; std::optional num_rows{}; + if (!query.prewhere() && !query.where()) + { num_rows = storage->totalRows(settings); + } else // It's possible to optimize count() given only partition predicates { SelectQueryInfo temp_query_info; @@ -1280,6 +1283,7 @@ void InterpreterSelectQuery::executeFetchColumns( num_rows = storage->totalRowsByPartitionPredicate(temp_query_info, *context); } + if (num_rows) { AggregateFunctionCount & agg_count = static_cast(*func); @@ -1774,7 +1778,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), std::move(transform_params), - settings.distributed_aggregation_memory_efficient, + settings.distributed_aggregation_memory_efficient && storage && storage->isRemote(), settings.max_threads, settings.aggregation_memory_efficient_merge_threads);