1. refactor ExpressionAnalyzer

2. Update formatAST to support grouping sets query with distributed table
3. modify astExpression to support function in grouping sets
This commit is contained in:
fanzhou 2021-09-11 11:08:22 +08:00 committed by Dmitry Novik
parent e725630fbf
commit b94f8878ff
8 changed files with 214 additions and 168 deletions

View File

@ -351,15 +351,10 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
if (select_query->group_by_with_grouping_sets)
{
ASTs group_elements_ast;
if (group_asts[i]->as<ASTExpressionList>())
{
group_elements_ast = group_asts[i]->as<const ASTExpressionList>()->children;
}
else
{
const auto id_ast = group_asts[i]->as<const ASTIdentifier &>();
group_elements_ast.push_back(std::make_shared<ASTIdentifier>(id_ast));
}
const ASTExpressionList * group_ast_element = group_asts[i]->as<const ASTExpressionList>();
if (!group_ast_element)
throw Exception("Grouping Sets element " + group_asts[i]->getColumnName() + " should be an expression type", ErrorCodes::UNKNOWN_IDENTIFIER);
group_elements_ast = group_ast_element->children;
NamesAndTypesList grouping_set_list;

View File

@ -2037,7 +2037,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
}
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter)
{
auto where_step = std::make_unique<FilterStep>(
@ -2047,6 +2046,79 @@ void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsD
query_plan.addStep(std::move(where_step));
}
void InterpreterSelectQuery::initAggregatorParams(
const Block & current_data_stream_header,
AggregatorParamsPtr & params_ptr,
const AggregateDescriptions & aggregates,
bool overflow_row, const Settings & settings,
size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes)
{
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersList keys_vector;
std::unordered_set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = current_data_stream_header.getPositionByName(key.name);
keys_set.insert(key_name_pos);
keys.push_back(key_name_pos);
}
keys_vector.push_back(keys);
}
all_keys.assign(keys_set.begin(), keys_set.end());
params_ptr = std::make_unique<Aggregator::Params>(
current_data_stream_header,
all_keys,
keys_vector,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
else
{
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(current_data_stream_header.getPositionByName(key.name));
params_ptr = std::make_unique<Aggregator::Params>(
current_data_stream_header,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
@ -2064,78 +2136,17 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
const Settings & settings = context->getSettingsRef();
std::shared_ptr<Aggregator::Params> params_ptr;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersList keys_vector;
std::unordered_set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = header_before_aggregation.getPositionByName(key.name);
keys_set.insert(key_name_pos);
keys.push_back(key_name_pos);
}
keys_vector.push_back(keys);
}
all_keys.assign(keys_set.begin(), keys_set.end());
AggregatorParamsPtr params_ptr;
initAggregatorParams(header_before_aggregation, params_ptr, aggregates, overflow_row, settings,
settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes);
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
all_keys,
keys_vector,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
else
{
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_aggregation.getPositionByName(key.name));
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
group_by_sort_description = getSortDescriptionFromGroupBy(query);
if (group_by_info && settings.optimize_aggregation_in_order)
group_by_sort_description = getSortDescriptionFromGroupBy(getSelectQuery());
else
group_by_info = nullptr;
@ -2208,76 +2219,14 @@ void InterpreterSelectQuery::executeTotalsAndHaving(
query_plan.addStep(std::move(totals_having_step));
}
void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator)
{
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
const Settings & settings = context->getSettingsRef();
std::shared_ptr<Aggregator::Params> params_ptr;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersList keys_vector;
std::unordered_set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = header_before_transform.getPositionByName(key.name);
keys_set.insert(key_name_pos);
keys.push_back(key_name_pos);
}
keys_vector.push_back(keys);
}
all_keys.assign(keys_set.begin(), keys_set.end());
params_ptr = std::make_shared<Aggregator::Params>(
header_before_transform,
all_keys,
keys_vector,
query_analyzer->aggregates(),
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
0,
0,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
else
{
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_transform.getPositionByName(key.name));
params_ptr = std::make_shared<Aggregator::Params>(
header_before_transform,
keys,
query_analyzer->aggregates(),
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
0,
0,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
AggregatorParamsPtr params_ptr;
initAggregatorParams(header_before_transform, params_ptr, query_analyzer->aggregates(), false, settings, 0, 0);
auto transform_params = std::make_shared<AggregatingTransformParams>(*params_ptr, true);
QueryPlanStepPtr step;

View File

@ -30,6 +30,7 @@ class QueryPlan;
struct TreeRewriterResult;
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
using AggregatorParamsPtr = std::unique_ptr<Aggregator::Params>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
@ -127,6 +128,12 @@ private:
/// Different stages of query execution.
void initAggregatorParams(
const Block & current_data_stream_header,
AggregatorParamsPtr & params_ptr,
const AggregateDescriptions & aggregates,
bool overflow_row, const Settings & settings,
size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
void executeAggregation(

View File

@ -17,16 +17,38 @@ void ASTExpressionList::formatImpl(const FormatSettings & settings, FormatState
if (frame.expression_list_prepend_whitespace)
settings.ostr << ' ';
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
if (frame.need_parens)
{
if (it != children.begin())
settings.ostr << "(";
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (separator)
settings.ostr << separator;
settings.ostr << ' ';
}
if (it != children.begin())
{
if (separator)
settings.ostr << separator;
settings.ostr << ' ';
}
(*it)->formatImpl(settings, state, frame);
settings.ostr << "(";
FormatStateStacked frame_nested = frame;
frame_nested.need_parens = false;
(*it)->formatImpl(settings, state, frame_nested);
settings.ostr << ")";
}
settings.ostr << ")";
}
else
{
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (it != children.begin())
{
if (separator)
settings.ostr << separator;
settings.ostr << ' ';
}
(*it)->formatImpl(settings, state, frame);
}
}
}
@ -41,20 +63,53 @@ void ASTExpressionList::formatImplMultiline(const FormatSettings & settings, For
}
++frame.indent;
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
if (frame.need_parens)
{
if (it != children.begin())
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (separator)
settings.ostr << separator;
if (it != children.begin())
{
if (separator)
settings.ostr << separator;
}
if (children.size() > 1 || frame.expression_list_always_start_on_new_line)
settings.ostr << indent_str;
if (it == children.begin())
{
settings.ostr << "(";
}
FormatStateStacked frame_nested = frame;
frame_nested.expression_list_always_start_on_new_line = false;
frame_nested.expression_list_prepend_whitespace = false;
frame_nested.need_parens = false;
settings.ostr << "(";
(*it)->formatImpl(settings, state, frame_nested);
settings.ostr << ")";
}
settings.ostr << ")";
}
else
{
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (it != children.begin())
{
if (separator)
settings.ostr << separator;
}
if (children.size() > 1 || frame.expression_list_always_start_on_new_line)
settings.ostr << indent_str;
if (children.size() > 1 || frame.expression_list_always_start_on_new_line)
settings.ostr << indent_str;
FormatStateStacked frame_nested = frame;
frame_nested.expression_list_always_start_on_new_line = false;
(*it)->formatImpl(settings, state, frame_nested);
FormatStateStacked frame_nested = frame;
frame_nested.expression_list_always_start_on_new_line = false;
(*it)->formatImpl(settings, state, frame_nested);
}
}
}

View File

@ -114,9 +114,12 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
if (groupBy())
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "GROUP BY" << (s.hilite ? hilite_none : "");
s.one_line
if (!group_by_with_grouping_sets)
{
s.one_line
? groupBy()->formatImpl(s, state, frame)
: groupBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
}
}
if (group_by_with_rollup)
@ -126,7 +129,15 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH CUBE" << (s.hilite ? hilite_none : "");
if (group_by_with_grouping_sets)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH GROUPING SETS" << (s.hilite ? hilite_none : "");
{
bool tmp_need_parens = frame.need_parens;
frame.need_parens = true;
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "GROUPING SETS" << (s.hilite ? hilite_none : "");
s.one_line
? groupBy()->formatImpl(s, state, frame)
: groupBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
frame.need_parens = tmp_need_parens;
}
if (group_by_with_totals)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH TOTALS" << (s.hilite ? hilite_none : "");

View File

@ -762,7 +762,7 @@ bool ParserGroupingSetsExpressionListElements::parseImpl(Pos & pos, ASTPtr & nod
ParserToken s_close(TokenType::ClosingRoundBracket);
ParserExpressionWithOptionalAlias p_expression(false);
ParserList p_command(std::make_unique<ParserExpressionWithOptionalAlias>(false),
std::make_unique<ParserToken>(TokenType::Comma), false);
std::make_unique<ParserToken>(TokenType::Comma), true);
do
{
@ -775,6 +775,9 @@ bool ParserGroupingSetsExpressionListElements::parseImpl(Pos & pos, ASTPtr & nod
{
return false;
}
auto list = std::make_shared<ASTExpressionList>(',');
list->children.push_back(command);
command = std::move(list);
}
else
{

View File

@ -1,3 +1,5 @@
1 0 1 4500
1 0 3 4700
1 0 5 4900
@ -40,6 +42,28 @@
2 4 0 0 4800
2 5 0 0 5400
0 0 0 49500
0 0 1 4500
0 0 2 4600
0 0 3 4700
0 0 4 4800
0 0 5 4900
0 0 6 5000
0 0 7 5100
0 0 8 5200
0 0 9 5300
0 0 10 5400
1 1 0 4500
1 2 0 5100
1 3 0 4700
1 4 0 5300
1 5 0 4900
2 1 0 5000
2 2 0 4600
2 3 0 5200
2 4 0 4800
2 5 0 5400
1 0 24500
1 1 4500
1 3 4700

View File

@ -2,6 +2,8 @@ DROP TABLE IF EXISTS grouping_sets;
CREATE TABLE grouping_sets(fact_1_id Int32, fact_2_id Int32, fact_3_id Int32, fact_4_id Int32, sales_value Int32) ENGINE = Memory;
SELECT fact_1_id, fact_3_id, sum(sales_value), count() from grouping_sets GROUP BY GROUPING SETS(fact_1_id, fact_3_id) ORDER BY fact_1_id, fact_3_id;
INSERT INTO grouping_sets
SELECT
number % 2 + 1 AS fact_1_id,
@ -12,19 +14,23 @@ SELECT
FROM system.numbers limit 1000;
SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets
GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_1_id, fact_3_id))
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_1_id, fact_3_id))
ORDER BY fact_1_id, fact_2_id, fact_3_id;
SELECT fact_1_id, fact_2_id, fact_3_id, fact_4_id, SUM(sales_value) AS sales_value from grouping_sets
GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_3_id, fact_4_id))
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_3_id, fact_4_id))
ORDER BY fact_1_id, fact_2_id, fact_3_id, fact_4_id;
SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_3_id), ())
ORDER BY fact_1_id, fact_2_id, fact_3_id;
SELECT
fact_1_id,
fact_3_id,
SUM(sales_value) AS sales_value
FROM grouping_sets
GROUP BY grouping sets((fact_1_id), (fact_1_id, fact_3_id)) WITH TOTALS
GROUP BY grouping sets ((fact_1_id), (fact_1_id, fact_3_id)) WITH TOTALS
ORDER BY fact_1_id, fact_3_id;
SELECT
@ -32,11 +38,7 @@ SELECT
fact_3_id,
SUM(sales_value) AS sales_value
FROM grouping_sets
GROUP BY grouping sets(fact_1_id, (fact_1_id, fact_3_id)) WITH TOTALS
GROUP BY grouping sets (fact_1_id, (fact_1_id, fact_3_id)) WITH TOTALS
ORDER BY fact_1_id, fact_3_id;
truncate grouping_sets;
SELECT fact_1_id, fact_3_id, sum(sales_value), count() from grouping_sets GROUP BY GROUPING SETS(fact_1_id, fact_3_id) ORDER BY fact_1_id, fact_3_id;
DROP TABLE grouping_sets;