Merge pull request #20599 from ClickHouse/distributed_aggregation_memory_efficient_by_default

Enable distributed_aggregation_memory_efficient by default
This commit is contained in:
alexey-milovidov 2021-02-22 13:09:09 +03:00 committed by GitHub
commit 0afbddd665
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 2 deletions

View File

@ -100,7 +100,7 @@ class IColumn;
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \ M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \ M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \
M(UInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \ M(UInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \
M(Bool, distributed_aggregation_memory_efficient, false, "Is the memory-saving mode of distributed aggregation enabled.", 0) \ M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \
M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \ M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \
\ \
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \ M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \

View File

@ -1285,8 +1285,11 @@ void InterpreterSelectQuery::executeFetchColumns(
const auto & desc = query_analyzer->aggregates()[0]; const auto & desc = query_analyzer->aggregates()[0];
const auto & func = desc.function; const auto & func = desc.function;
std::optional<UInt64> num_rows{}; std::optional<UInt64> num_rows{};
if (!query.prewhere() && !query.where()) if (!query.prewhere() && !query.where())
{
num_rows = storage->totalRows(settings); num_rows = storage->totalRows(settings);
}
else // It's possible to optimize count() given only partition predicates else // It's possible to optimize count() given only partition predicates
{ {
SelectQueryInfo temp_query_info; SelectQueryInfo temp_query_info;
@ -1296,6 +1299,7 @@ void InterpreterSelectQuery::executeFetchColumns(
num_rows = storage->totalRowsByPartitionPredicate(temp_query_info, *context); num_rows = storage->totalRowsByPartitionPredicate(temp_query_info, *context);
} }
if (num_rows) if (num_rows)
{ {
AggregateFunctionCount & agg_count = static_cast<AggregateFunctionCount &>(*func); AggregateFunctionCount & agg_count = static_cast<AggregateFunctionCount &>(*func);
@ -1790,7 +1794,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
auto merging_aggregated = std::make_unique<MergingAggregatedStep>( auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
std::move(transform_params), std::move(transform_params),
settings.distributed_aggregation_memory_efficient, settings.distributed_aggregation_memory_efficient && storage && storage->isRemote(),
settings.max_threads, settings.max_threads,
settings.aggregation_memory_efficient_merge_threads); settings.aggregation_memory_efficient_merge_threads);