grouping sets development

This commit is contained in:
MaxTheHuman 2021-05-15 19:41:22 +03:00 committed by Dmitry Novik
parent 3418de337a
commit e60d1dd818
3 changed files with 111 additions and 42 deletions

View File

@ -948,15 +948,21 @@ public:
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
const Block & intermediate_header_ = {})
: src_header(src_header_),
intermediate_header(intermediate_header_),
keys_vector(keys_vector_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
min_free_disk_space(min_free_disk_space_),
compile_aggregate_expressions(compile_aggregate_expressions_),
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
{
}

View File

@ -1182,18 +1182,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.need_aggregate)
{
if (query.group_by_with_grouping_sets)
{
LOG_DEBUG(log, "About to run executeGroupingSets");
executeGroupingSets(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
}
else
{
LOG_DEBUG(log, "About to run executeAggregation");
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
}
LOG_DEBUG(log, "about to run executeAggregation");
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
@ -2064,10 +2056,36 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
ColumnNumbersTwoDimension keys_vector;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "executeAggregation pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(
log,
"GroupingSets add key with name {} and number {}",
key.name,
header_before_aggregation.getPositionByName(key.name));
}
keys_vector.push_back(keys);
LOG_DEBUG(
log,
"GroupingSets add keys set of size {}",
keys.size());
}
}
else
{
for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "executeAggregation pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
}
}
AggregateDescriptions aggregates = query_analyzer->aggregates();
@ -2078,29 +2096,72 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
// Aggregator::Params params(
// header_before_aggregation,
// keys,
// aggregates,
// overflow_row,
// settings.max_rows_to_group_by,
// settings.group_by_overflow_mode,
// settings.group_by_two_level_threshold,
// settings.group_by_two_level_threshold_bytes,
// settings.max_bytes_before_external_group_by,
// settings.empty_result_for_aggregation_by_empty_set
// || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
// && query_analyzer->hasConstAggregationKeys()),
// context->getTemporaryVolume(),
// settings.max_threads,
// settings.min_free_disk_space_for_temporary_data,
// settings.compile_aggregate_expressions,
// settings.min_count_to_compile_aggregate_expression);
std::shared_ptr<Aggregator::Params> params_ptr;
if (query.group_by_with_grouping_sets)
{
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
keys_vector,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
else
{
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order)
group_by_sort_description = getSortDescriptionFromGroupBy(getSelectQuery());
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
group_by_sort_description = getSortDescriptionFromGroupBy(query);
else
group_by_info = nullptr;
@ -2113,7 +2174,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
params,
*params_ptr,
final,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
@ -2214,9 +2275,10 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
query_plan.addStep(std::move(step));
}
void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
/*
void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
/*
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
@ -2241,7 +2303,7 @@ void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const A
step = std::make_unique<GroupingSetsStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
query_plan.addStep(std::move(step));
*/
auto expression_before_aggregation = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
expression_before_aggregation->setStepDescription("Before GROUP BY");
@ -2322,7 +2384,9 @@ void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const A
std::move(group_by_sort_description));
query_plan.addStep(std::move(aggregating_step));
}
*/
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description)
{

View File

@ -161,7 +161,6 @@ private:
};
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
void executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*