Planner small fixes

This commit is contained in:
Maksim Kita 2023-01-13 17:53:53 +01:00
parent 13d4d40568
commit 2c56b0b2b9
9 changed files with 134 additions and 27 deletions

View File

@ -1112,6 +1112,7 @@ void Planner::buildPlanForQueryNode()
checkStoragesSupportTransactions(planner_context);
collectTableExpressionData(query_tree, *planner_context);
collectSets(query_tree, *planner_context);
resolveGroupingFunctions(query_tree, *planner_context);
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;

View File

@ -91,6 +91,7 @@ public:
QueryTreeNodeWeakPtr column_source;
auto grouping_set_argument_column = std::make_shared<ColumnNode>(NameAndTypePair{"__grouping_set", std::make_shared<DataTypeUInt64>()}, column_source);
auto previous_arguments = function_node->getArguments().getNodes();
function_node->getArguments().getNodes().clear();
bool force_grouping_standard_compatibility = planner_context.getQueryContext()->getSettingsRef().force_grouping_standard_compatibility;
@ -129,6 +130,9 @@ public:
break;
}
}
auto & function_node_arguments = function_node->getArguments().getNodes();
function_node_arguments.insert(function_node_arguments.end(), previous_arguments.begin(), previous_arguments.end());
}
static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr & child_node)
@ -163,8 +167,6 @@ void resolveGroupingFunctions(QueryTreeNodePtr & node,
visitor.visit(query_node_typed.getProjectionNode());
}
}
void resolveGroupingFunctions(QueryTreeNodePtr & query_node,
const Names & aggregation_keys,
const GroupingSetsParamsList & grouping_sets_parameters_list,
@ -183,6 +185,87 @@ void resolveGroupingFunctions(QueryTreeNodePtr & query_node,
resolveGroupingFunctions(query_node, group_by_kind, aggregation_keys, grouping_sets_parameters_list, planner_context);
}
}
void resolveGroupingFunctions(QueryTreeNodePtr & query_node, const PlannerContext & planner_context)
{
auto & query_node_typed = query_node->as<QueryNode &>();
std::unordered_set<std::string_view> used_aggregation_keys;
Names aggregation_keys;
GroupingSetsParamsList grouping_sets_parameters_list;
QueryTreeNodeToName group_by_node_to_name;
/// Add expressions from GROUP BY
if (query_node_typed.hasGroupBy())
{
if (query_node_typed.isGroupByWithGroupingSets())
{
for (const auto & grouping_set_keys_list_node : query_node_typed.getGroupBy().getNodes())
{
auto & grouping_set_keys_list_node_typed = grouping_set_keys_list_node->as<ListNode &>();
grouping_sets_parameters_list.emplace_back();
auto & grouping_sets_parameters = grouping_sets_parameters_list.back();
for (auto & grouping_set_key_node : grouping_set_keys_list_node_typed.getNodes())
{
auto grouping_set_key_name = calculateActionNodeName(grouping_set_key_node, planner_context, group_by_node_to_name);
grouping_sets_parameters.used_keys.push_back(grouping_set_key_name);
if (used_aggregation_keys.contains(grouping_set_key_name))
continue;
aggregation_keys.push_back(grouping_set_key_name);
used_aggregation_keys.insert(aggregation_keys.back());
}
}
for (auto & grouping_sets_parameter : grouping_sets_parameters_list)
{
NameSet grouping_sets_used_keys;
Names grouping_sets_keys;
for (auto & key : grouping_sets_parameter.used_keys)
{
auto [_, inserted] = grouping_sets_used_keys.insert(key);
if (inserted)
grouping_sets_keys.push_back(key);
}
for (auto & key : aggregation_keys)
{
if (grouping_sets_used_keys.contains(key))
continue;
grouping_sets_parameter.missing_keys.push_back(key);
}
grouping_sets_parameter.used_keys = std::move(grouping_sets_keys);
}
/// It is expected by execution layer that if there are only 1 grouping sets it will be removed
if (grouping_sets_parameters_list.size() == 1)
grouping_sets_parameters_list.clear();
}
else
{
for (auto & group_by_key_node : query_node_typed.getGroupBy().getNodes())
{
auto group_by_key_name = calculateActionNodeName(group_by_key_node, planner_context, group_by_node_to_name);
if (used_aggregation_keys.contains(group_by_key_name))
continue;
aggregation_keys.push_back(group_by_key_name);
used_aggregation_keys.insert(aggregation_keys.back());
}
}
}
resolveGroupingFunctions(query_node, aggregation_keys, grouping_sets_parameters_list, planner_context);
}
AggregateDescriptions extractAggregateDescriptions(const QueryTreeNodes & aggregate_function_nodes, const PlannerContext & planner_context)
{
QueryTreeNodeToName node_to_name;

View File

@ -13,11 +13,10 @@ namespace DB
/** Resolve GROUPING functions in query node.
* GROUPING function is replaced with specialized GROUPING function based on GROUP BY modifiers.
* For ROLLUP, CUBE, GROUPING SETS specialized GROUPING function take special __grouping_set column as argument.
* For ROLLUP, CUBE, GROUPING SETS specialized GROUPING function take special __grouping_set column as argument
* and previous GROUPING function arguments.
*/
void resolveGroupingFunctions(QueryTreeNodePtr & query_node,
const Names & aggregation_keys,
const GroupingSetsParamsList & grouping_sets_parameters_list,
const PlannerContext & planner_context);
/// Extract aggregate descriptions from aggregate function nodes

View File

@ -52,7 +52,7 @@ FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_no
/** Construct aggregation analysis result if query tree has GROUP BY or aggregates.
* Actions before aggregation are added into actions chain, if result is not null optional.
*/
std::optional<AggregationAnalysisResult> analyzeAggregation(QueryTreeNodePtr & query_tree,
std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodePtr & query_tree,
const ColumnsWithTypeAndName & join_tree_input_columns,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
@ -193,8 +193,6 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(QueryTreeNodePtr & q
if (query_node.isGroupByWithRollup() || query_node.isGroupByWithCube() || (query_node.isGroupByWithGroupingSets() && !disable_grouping_sets))
aggregates_columns.emplace_back(nullptr, std::make_shared<DataTypeUInt64>(), "__grouping_set");
resolveGroupingFunctions(query_tree, aggregation_keys, grouping_sets_parameters_list, *planner_context);
/// Only aggregation keys and aggregates are available for next steps after GROUP BY step
auto aggregate_step = std::make_unique<ActionsChainStep>(before_aggregation_actions, ActionsChainStep::AvailableOutputColumnsStrategy::OUTPUT_NODES, aggregates_columns);
actions_chain.addStep(std::move(aggregate_step));
@ -212,7 +210,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(QueryTreeNodePtr & q
/** Construct window analysis result if query tree has window functions.
* Actions before window functions are added into actions chain, if result is not null optional.
*/
std::optional<WindowAnalysisResult> analyzeWindow(QueryTreeNodePtr & query_tree,
std::optional<WindowAnalysisResult> analyzeWindow(const QueryTreeNodePtr & query_tree,
const ColumnsWithTypeAndName & join_tree_input_columns,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
@ -417,7 +415,7 @@ LimitByAnalysisResult analyzeLimitBy(const QueryNode & query_node,
}
PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(QueryTreeNodePtr query_tree,
PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNodePtr & query_tree,
const ColumnsWithTypeAndName & join_tree_input_columns,
const PlannerContextPtr & planner_context)
{
@ -463,14 +461,8 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(QueryTreeNodePtr
project_names_actions->project(projection_analysis_result.projection_column_names_with_display_aliases);
actions_chain.addStep(std::make_unique<ActionsChainStep>(project_names_actions));
// std::cout << "Chain dump before finalize" << std::endl;
// std::cout << actions_chain.dump() << std::endl;
actions_chain.finalize();
// std::cout << "Chain dump after finalize" << std::endl;
// std::cout << actions_chain.dump() << std::endl;
projection_analysis_result.project_names_actions = std::move(project_names_actions);
PlannerExpressionsAnalysisResult expressions_analysis_result(std::move(projection_analysis_result));

View File

@ -168,7 +168,7 @@ private:
};
/// Build expression analysis result for query tree, join tree input columns and planner context
PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(QueryTreeNodePtr query_tree,
PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNodePtr & query_tree,
const ColumnsWithTypeAndName & join_tree_input_columns,
const PlannerContextPtr & planner_context);

View File

@ -248,7 +248,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
if (max_streams == 0)
max_streams = 1;
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads
if (max_streams > 1 && !is_remote)
max_streams = static_cast<size_t>(max_streams * settings.max_streams_to_max_threads_ratio);
@ -841,7 +841,7 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
std::vector<ColumnIdentifierSet> table_expressions_outer_scope_columns(table_expressions_stack_size);
ColumnIdentifierSet current_outer_scope_columns = outer_scope_columns;
for (Int64 i = table_expressions_stack_size - 1; i >= 0; --i)
for (Int64 i = static_cast<Int64>(table_expressions_stack_size) - 1; i >= 0; --i)
{
table_expressions_outer_scope_columns[i] = current_outer_scope_columns;
@ -859,7 +859,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
{
if (query_plans_stack.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected at least 1 query plan on stack before ARRAY JOIN processing");
"Expected at least 1 query plan on stack before ARRAY JOIN processing. Actual {}",
query_plans_stack.size());
auto query_plan = std::move(query_plans_stack.back());
query_plans_stack.back() = buildQueryPlanForArrayJoinNode(table_expression,
@ -868,11 +869,10 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
}
else if (auto * join_node = table_expression->as<JoinNode>())
{
size_t table_expressions_column_nodes_with_names_stack_size = query_plans_stack.size();
if (table_expressions_column_nodes_with_names_stack_size < 2)
if (query_plans_stack.size() < 2)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected at least 2 query plans on stack before JOIN processing. Actual {}",
table_expressions_column_nodes_with_names_stack_size);
query_plans_stack.size());
auto right_query_plan = std::move(query_plans_stack.back());
query_plans_stack.pop_back();
@ -901,8 +901,10 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
}
}
if (query_plans_stack.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected at least 1 query plan for JOIN TREE");
if (query_plans_stack.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected 1 query plan for JOIN TREE. Actual {}",
query_plans_stack.size());
return std::move(query_plans_stack.back());
}

View File

@ -578,9 +578,9 @@ std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryP
bool has_aggregates = query_info.has_aggregates;
if (query_info.syntax_analyzer_result)
has_aggregates = query_info.syntax_analyzer_result->aggregates.empty();
has_aggregates = !query_info.syntax_analyzer_result->aggregates.empty();
if (!has_aggregates || group_by)
if (has_aggregates || group_by)
{
if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children))
return {};

View File

@ -0,0 +1,10 @@
45 0 0 0 1
45 0 1 0 1
45 0 2 0 1
45 0 3 0 1
45 0 4 0 1
45 0 5 0 1
45 0 6 0 1
45 0 7 0 1
45 0 8 0 1
45 0 9 0 1

View File

@ -0,0 +1,20 @@
SET allow_experimental_analyzer = 1;
SELECT
sum(a.number) AS total,
c.number AS cn,
b.number AS bn,
grouping(c.number) + grouping(b.number) AS l,
rank() OVER (PARTITION BY grouping(c.number) + grouping(b.number), multiIf(grouping(c.number) = 0, b.number, NULL) ORDER BY sum(a.number) DESC) AS r
FROM numbers(10) AS a, numbers(10) AS b, numbers(10) AS c
GROUP BY
cn,
bn
WITH ROLLUP
ORDER BY
total ASC,
cn ASC,
bn ASC,
l ASC,
r ASC
LIMIT 10;