This commit is contained in:
Nickita Taranov 2022-03-26 01:36:06 +01:00
parent a08c035443
commit eedcd61479
4 changed files with 73 additions and 51 deletions

View File

@ -9,7 +9,7 @@
namespace DB::QueryPlanOptimizations namespace DB::QueryPlanOptimizations
{ {
void swapSortingAndUnnecessaryCalculation(QueryPlan::Node * parent_node, ActionsDAGPtr && actions) void swapSortingAndUnnecessaryCalculation(QueryPlan::Node * parent_node, ActionsDAGPtr && unneeded_for_sorting)
{ {
QueryPlan::Node * child_node = parent_node->children.front(); QueryPlan::Node * child_node = parent_node->children.front();
@ -17,14 +17,24 @@ void swapSortingAndUnnecessaryCalculation(QueryPlan::Node * parent_node, Actions
auto & child_step = child_node->step; auto & child_step = child_node->step;
auto * sorting_step = typeid_cast<SortingStep *>(parent_step.get()); auto * sorting_step = typeid_cast<SortingStep *>(parent_step.get());
// Sorting -> UnnecessaryCalculations // Sorting -> Expression
std::swap(parent_step, child_step); std::swap(parent_step, child_step);
// UnnecessaryCalculations -> Sorting // Expression -> Sorting
sorting_step->updateInputStream(child_node->children.at(0)->step->getOutputStream()); sorting_step->updateInputStream(child_node->children.at(0)->step->getOutputStream());
auto input_header = child_step->getInputStreams().at(0).header; LOG_TRACE(
&Poco::Logger::get("Optimizer"), "New Sorting input header: {}", sorting_step->getInputStreams().at(0).header.dumpStructure());
auto input_header = sorting_step->getInputStreams().at(0).header;
LOG_TRACE(&Poco::Logger::get("Optimizer"), "Old Sorting output header: {}", sorting_step->getOutputStream().header.dumpStructure());
sorting_step->updateOutputStream(std::move(input_header)); sorting_step->updateOutputStream(std::move(input_header));
parent_step = std::make_unique<ExpressionStep>(child_step->getOutputStream(), std::move(actions)); LOG_TRACE(&Poco::Logger::get("Optimizer"), "New Sorting output header: {}", sorting_step->getOutputStream().header.dumpStructure());
auto description = parent_node->step->getStepDescription();
parent_step = std::make_unique<ExpressionStep>(child_step->getOutputStream(), std::move(unneeded_for_sorting));
LOG_TRACE(
&Poco::Logger::get("Optimizer"), "New Expression input header: {}", parent_step->getInputStreams().at(0).header.dumpStructure());
LOG_TRACE(&Poco::Logger::get("Optimizer"), "New Expression output header: {}", parent_step->getOutputStream().header.dumpStructure());
parent_step->setStepDescription(description + " [lifted up part]");
// UnneededCalculations -> Sorting
} }
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes) size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
@ -46,31 +56,35 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
for (const auto & col : sorting_step->getSortDescription()) for (const auto & col : sorting_step->getSortDescription())
sort_columns.insert(col.column_name); sort_columns.insert(col.column_name);
const auto & expression = expression_step->getExpression(); const auto & expression = expression_step->getExpression();
auto split_actions = expression->splitActionsBySortingDescription(sort_columns); auto [needed_for_sorting, unneeded_for_sorting] = expression->splitActionsBySortingDescription(sort_columns);
LOG_TRACE(&Poco::Logger::get("Optimizer"), "source: {}", expression->dumpDAG()); LOG_TRACE(&Poco::Logger::get("Optimizer"), "Original Expression: {}", expression->dumpDAG());
LOG_TRACE(&Poco::Logger::get("Optimizer"), "first: {}", split_actions.first->dumpDAG()); LOG_TRACE(&Poco::Logger::get("Optimizer"), "Needed for Sorting: {}", needed_for_sorting->dumpDAG());
LOG_TRACE(&Poco::Logger::get("Optimizer"), "second: {}", split_actions.second->dumpDAG()); LOG_TRACE(&Poco::Logger::get("Optimizer"), "Unneeded for Sorting: {}", unneeded_for_sorting->dumpDAG());
auto description = child_step->getStepDescription();
// No calculations can be postponed. // No calculations can be postponed.
if (split_actions.second->trivial()) if (unneeded_for_sorting->trivial())
return 0; return 0;
// Everything can be done after the sorting. // Everything can be done after the sorting.
if (split_actions.first->trivial()) /*if (needed_for_sorting->trivial())
{ {
swapSortingAndUnnecessaryCalculation(parent_node, std::move(split_actions.second)); swapSortingAndUnnecessaryCalculation(parent_node, std::move(unneeded_for_sorting));
return 2; return 2;
} }*/
// Sorting -> Expression // Sorting (parent_node) -> Expression (child_node)
auto & node = nodes.emplace_back(); auto & node_with_needed = nodes.emplace_back();
node.children.swap(child_node->children); node_with_needed.children.swap(child_node->children);
child_node->children.emplace_back(&node); child_node->children.emplace_back(&node_with_needed);
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(), std::move(split_actions.first)); node_with_needed.step
= std::make_unique<ExpressionStep>(node_with_needed.children.at(0)->step->getOutputStream(), std::move(needed_for_sorting));
node_with_needed.step->setStepDescription(std::move(description));
// Sorting (parent_node) -> UnnecessaryCalculations (child_node) -> NecessaryCalculations (node) // Sorting (parent_node) -> so far the origin Expression (child_node) -> NeededCalculations (node_with_needed)
swapSortingAndUnnecessaryCalculation(parent_node, std::move(split_actions.second)); swapSortingAndUnnecessaryCalculation(parent_node, std::move(unneeded_for_sorting));
// UnnecessaryCalculations (child_node) -> Sorting (parent_node) -> NecessaryCalculations (node) // UneededCalculations (child_node) -> Sorting (parent_node) -> NeededCalculations (node_with_needed)
return 3; return 3;
} }

View File

@ -98,6 +98,7 @@ void SortingStep::updateInputStream(DataStream input_stream)
void SortingStep::updateOutputStream(Block result_header) void SortingStep::updateOutputStream(Block result_header)
{ {
output_stream = createOutputStream(input_streams.front(), std::move(result_header), getDataStreamTraits()); output_stream = createOutputStream(input_streams.front(), std::move(result_header), getDataStreamTraits());
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
} }
void SortingStep::updateLimit(size_t limit_) void SortingStep::updateLimit(size_t limit_)

View File

@ -35,6 +35,7 @@ Expression (Projection)
ReadFromMergeTree (default.test_table) ReadFromMergeTree (default.test_table)
Expression (Projection) Expression (Projection)
Limit (preliminary LIMIT (without OFFSET)) Limit (preliminary LIMIT (without OFFSET))
Expression (Before ORDER BY [lifted up part])
Sorting Sorting
Expression (Before ORDER BY) Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage) SettingQuotaAndLimits (Set limits and quota after reading from storage)

View File

@ -7,6 +7,8 @@
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Expression)
ExpressionTransform
(Sorting) (Sorting)
MergingSortedTransform 2 → 1 MergingSortedTransform 2 → 1
(Expression) (Expression)
@ -23,6 +25,8 @@ ExpressionTransform
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Expression)
ExpressionTransform
(Sorting) (Sorting)
MergingSortedTransform 2 → 1 MergingSortedTransform 2 → 1
(Expression) (Expression)
@ -42,6 +46,8 @@ ExpressionTransform
ExpressionTransform ExpressionTransform
(Limit) (Limit)
Limit Limit
(Expression)
ExpressionTransform
(Sorting) (Sorting)
FinishSortingTransform FinishSortingTransform
PartialSortingTransform PartialSortingTransform