grouping sets: make simple aggregation with grouping sets to work

This commit is contained in:
MaxTheHuman 2021-05-21 17:56:54 +03:00 committed by Dmitry Novik
parent e22cbb7cd8
commit c7c2093bd9
3 changed files with 18 additions and 20 deletions

View File

@ -381,16 +381,12 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
if (!unique_keys.count(key.name))
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
if (select_query->group_by_with_grouping_sets)
{
aggregation_keys_list.push_back({key});
aggregation_keys.push_back(key);
LOG_DEBUG(poco_log, "pushed grouping set of 1 column: " + key.name);
}
else
{
aggregation_keys.push_back(key);
}
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
@ -1662,7 +1658,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
/// TODO correct conditions
optimize_aggregation_in_order =
context->getSettingsRef().optimize_aggregation_in_order
&& storage && query.groupBy() && !query.group_by_with_grouping_sets;
&& storage && query.groupBy();
query_analyzer.appendGroupBy(chain, only_types || !first_stage, optimize_aggregation_in_order, group_by_elements_actions);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);

View File

@ -969,8 +969,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
bool aggregate_final =
expressions.need_aggregate &&
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube &&
!query.group_by_with_grouping_sets;
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
@ -2074,14 +2073,14 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
all_keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(
log,
"GroupingSets add key with name {} and number {}",
"execute aggregation with grouping sets add key with name {} and number {}",
key.name,
header_before_aggregation.getPositionByName(key.name));
}
keys_vector.push_back(keys);
LOG_DEBUG(
log,
"GroupingSets add keys set of size {}",
"execute aggregation with grouping sets add keys set of size {}",
keys.size());
}
}
@ -2090,7 +2089,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "executeAggregation pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "execute aggregation without grouping sets pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
}
}
@ -2147,10 +2146,13 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
}
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets) {
group_by_sort_description = getSortDescriptionFromGroupBy(query);
else
LOG_DEBUG(log, "execute aggregation without grouping sets got group_by_sort_description");
} else {
group_by_info = nullptr;
LOG_DEBUG(log, "execute aggregation didn't get group_by_sort_description");
}
auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
@ -2159,7 +2161,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
LOG_DEBUG(log, "GroupingSets step header before step structure: {}", query_plan.getCurrentDataStream().header.dumpStructure());
LOG_DEBUG(log, "execute aggregation header structure before step: {}", query_plan.getCurrentDataStream().header.dumpStructure());
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
*params_ptr,
@ -2171,7 +2173,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
LOG_DEBUG(log, "GroupingSets step header after step structure: {}", aggregating_step->getOutputStream().header.dumpStructure());
LOG_DEBUG(log, "execute aggregation header structure after step: {}", aggregating_step->getOutputStream().header.dumpStructure());
query_plan.addStep(std::move(aggregating_step));
}

View File

@ -84,8 +84,8 @@ void optimizeGroupBy(ASTSelectQuery * select_query, ContextPtr context)
{
const FunctionFactory & function_factory = FunctionFactory::instance();
if (select_query->group_by_with_grouping_sets)
return;
// if (select_query->group_by_with_grouping_sets)
// return;
if (!select_query->groupBy())
return;
@ -216,7 +216,7 @@ GroupByKeysInfo getGroupByKeysInfo(const ASTs & group_by_keys)
///eliminate functions of other GROUP BY keys
void optimizeGroupByFunctionKeys(ASTSelectQuery * select_query)
{
if (!select_query->groupBy() || select_query->group_by_with_grouping_sets)
if (!select_query->groupBy())
return;
auto group_by = select_query->groupBy();
@ -246,7 +246,7 @@ void optimizeGroupByFunctionKeys(ASTSelectQuery * select_query)
/// Eliminates min/max/any-aggregators of functions of GROUP BY keys
void optimizeAggregateFunctionsOfGroupByKeys(ASTSelectQuery * select_query, ASTPtr & node)
{
if (!select_query->groupBy() || select_query->group_by_with_grouping_sets)
if (!select_query->groupBy())
return;
const auto & group_by_keys = select_query->groupBy()->children;
@ -416,7 +416,7 @@ void optimizeMonotonousFunctionsInOrderBy(ASTSelectQuery * select_query, Context
const TreeRewriterResult & result)
{
auto order_by = select_query->orderBy();
if (!order_by || select_query->group_by_with_grouping_sets)
if (!order_by)
return;
/// Do not apply optimization for Distributed and Merge storages,