mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #19725 from ClickHouse/limit-push-down
Update QueryPlan tree optimization traverse.
This commit is contained in:
commit
f7dc83ae96
@ -190,6 +190,7 @@ add_object_library(clickhouse_processors_sources Processors/Sources)
|
||||
add_object_library(clickhouse_processors_merges Processors/Merges)
|
||||
add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms)
|
||||
add_object_library(clickhouse_processors_queryplan Processors/QueryPlan)
|
||||
add_object_library(clickhouse_processors_queryplan_optimizations Processors/QueryPlan/Optimizations)
|
||||
|
||||
set (DBMS_COMMON_LIBRARIES)
|
||||
# libgcc_s does not provide an implementation of an atomics library. Instead,
|
||||
|
@ -609,10 +609,10 @@ bool ActionsDAG::hasStatefulFunctions() const
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ActionsDAG::empty() const
|
||||
bool ActionsDAG::trivial() const
|
||||
{
|
||||
for (const auto & node : nodes)
|
||||
if (node.type != ActionType::INPUT)
|
||||
if (node.type == ActionType::FUNCTION || node.type == ActionType::ARRAY_JOIN)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
|
@ -223,7 +223,7 @@ public:
|
||||
|
||||
bool hasArrayJoin() const;
|
||||
bool hasStatefulFunctions() const;
|
||||
bool empty() const; /// If actions only contain inputs.
|
||||
bool trivial() const; /// If actions has no functions or array join.
|
||||
|
||||
const ActionsSettings & getSettings() const { return settings; }
|
||||
|
||||
|
56
src/Processors/QueryPlan/Optimizations/Optimizations.h
Normal file
56
src/Processors/QueryPlan/Optimizations/Optimizations.h
Normal file
@ -0,0 +1,56 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <array>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace QueryPlanOptimizations
|
||||
{
|
||||
|
||||
/// This is the main function which optimizes the whole QueryPlan tree.
|
||||
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes);
|
||||
|
||||
/// Optimization is a function applied to QueryPlan::Node.
|
||||
/// It can read and update subtree of specified node.
|
||||
/// It return the number of updated layers of subtree if some change happened.
|
||||
/// It must guarantee that the structure of tree is correct.
|
||||
///
|
||||
/// New nodes should be added to QueryPlan::Nodes list.
|
||||
/// It is not needed to remove old nodes from the list.
|
||||
struct Optimization
|
||||
{
|
||||
using Function = size_t (*)(QueryPlan::Node *, QueryPlan::Nodes &);
|
||||
const Function apply = nullptr;
|
||||
const char * name;
|
||||
};
|
||||
|
||||
/// Move ARRAY JOIN up if possible.
|
||||
size_t tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
|
||||
|
||||
/// Move LimitStep down if possible.
|
||||
size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
|
||||
|
||||
/// Split FilterStep into chain `ExpressionStep -> FilterStep`, where FilterStep contains minimal number of nodes.
|
||||
size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
|
||||
|
||||
/// Replace chain `ExpressionStep -> ExpressionStep` to single ExpressionStep
|
||||
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
|
||||
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
|
||||
|
||||
inline const auto & getOptimizations()
|
||||
{
|
||||
static const std::array<Optimization, 4> optimizations =
|
||||
{{
|
||||
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
|
||||
{tryPushDownLimit, "pushDownLimit"},
|
||||
{trySplitFilter, "splitFilter"},
|
||||
{tryMergeExpressions, "mergeExpressions"},
|
||||
}};
|
||||
|
||||
return optimizations;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
85
src/Processors/QueryPlan/Optimizations/liftUpArrayJoin.cpp
Normal file
85
src/Processors/QueryPlan/Optimizations/liftUpArrayJoin.cpp
Normal file
@ -0,0 +1,85 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/ArrayJoinStep.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
size_t tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
if (parent_node->children.size() != 1)
|
||||
return 0;
|
||||
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
auto * expression_step = typeid_cast<ExpressionStep *>(parent.get());
|
||||
auto * filter_step = typeid_cast<FilterStep *>(parent.get());
|
||||
auto * array_join_step = typeid_cast<ArrayJoinStep *>(child.get());
|
||||
|
||||
if (!(expression_step || filter_step) || !array_join_step)
|
||||
return 0;
|
||||
|
||||
const auto & array_join = array_join_step->arrayJoin();
|
||||
const auto & expression = expression_step ? expression_step->getExpression()
|
||||
: filter_step->getExpression();
|
||||
|
||||
auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns);
|
||||
|
||||
/// No actions can be moved before ARRAY JOIN.
|
||||
if (split_actions.first->trivial())
|
||||
return 0;
|
||||
|
||||
auto description = parent->getStepDescription();
|
||||
|
||||
/// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin.
|
||||
if (split_actions.second->trivial())
|
||||
{
|
||||
auto expected_header = parent->getOutputStream().header;
|
||||
|
||||
/// Expression/Filter -> ArrayJoin
|
||||
std::swap(parent, child);
|
||||
/// ArrayJoin -> Expression/Filter
|
||||
|
||||
if (expression_step)
|
||||
child = std::make_unique<ExpressionStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first));
|
||||
else
|
||||
child = std::make_unique<FilterStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first),
|
||||
filter_step->getFilterColumnName(),
|
||||
filter_step->removesFilterColumn());
|
||||
|
||||
child->setStepDescription(std::move(description));
|
||||
|
||||
array_join_step->updateInputStream(child->getOutputStream(), expected_header);
|
||||
return 2;
|
||||
}
|
||||
|
||||
/// Add new expression step before ARRAY JOIN.
|
||||
/// Expression/Filter -> ArrayJoin -> Something
|
||||
auto & node = nodes.emplace_back();
|
||||
node.children.swap(child_node->children);
|
||||
child_node->children.emplace_back(&node);
|
||||
/// Expression/Filter -> ArrayJoin -> node -> Something
|
||||
|
||||
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first));
|
||||
node.step->setStepDescription(description);
|
||||
array_join_step->updateInputStream(node.step->getOutputStream(), {});
|
||||
|
||||
if (expression_step)
|
||||
parent = std::make_unique<ExpressionStep>(array_join_step->getOutputStream(), split_actions.second);
|
||||
else
|
||||
parent = std::make_unique<FilterStep>(array_join_step->getOutputStream(), split_actions.second,
|
||||
filter_step->getFilterColumnName(), filter_step->removesFilterColumn());
|
||||
|
||||
parent->setStepDescription(description + " [split]");
|
||||
return 3;
|
||||
}
|
||||
|
||||
}
|
114
src/Processors/QueryPlan/Optimizations/limitPushDown.cpp
Normal file
114
src/Processors/QueryPlan/Optimizations/limitPushDown.cpp
Normal file
@ -0,0 +1,114 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <Processors/QueryPlan/LimitStep.h>
|
||||
#include <Processors/QueryPlan/TotalsHavingStep.h>
|
||||
#include <Processors/QueryPlan/MergingSortedStep.h>
|
||||
#include <Processors/QueryPlan/FinishSortingStep.h>
|
||||
#include <Processors/QueryPlan/MergeSortingStep.h>
|
||||
#include <Processors/QueryPlan/PartialSortingStep.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
/// If plan looks like Limit -> Sorting, update limit for Sorting
|
||||
static bool tryUpdateLimitForSortingSteps(QueryPlan::Node * node, size_t limit)
|
||||
{
|
||||
if (limit == 0)
|
||||
return false;
|
||||
|
||||
QueryPlanStepPtr & step = node->step;
|
||||
QueryPlan::Node * child = nullptr;
|
||||
bool updated = false;
|
||||
|
||||
if (auto * merging_sorted = typeid_cast<MergingSortedStep *>(step.get()))
|
||||
{
|
||||
/// TODO: remove LimitStep here.
|
||||
merging_sorted->updateLimit(limit);
|
||||
updated = true;
|
||||
child = node->children.front();
|
||||
}
|
||||
else if (auto * finish_sorting = typeid_cast<FinishSortingStep *>(step.get()))
|
||||
{
|
||||
/// TODO: remove LimitStep here.
|
||||
finish_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
}
|
||||
else if (auto * merge_sorting = typeid_cast<MergeSortingStep *>(step.get()))
|
||||
{
|
||||
merge_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
child = node->children.front();
|
||||
}
|
||||
else if (auto * partial_sorting = typeid_cast<PartialSortingStep *>(step.get()))
|
||||
{
|
||||
partial_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
}
|
||||
|
||||
/// We often have chain PartialSorting -> MergeSorting -> MergingSorted
|
||||
/// Try update limit for them also if possible.
|
||||
if (child)
|
||||
tryUpdateLimitForSortingSteps(child, limit);
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
|
||||
{
|
||||
if (parent_node->children.size() != 1)
|
||||
return 0;
|
||||
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
auto * limit = typeid_cast<LimitStep *>(parent.get());
|
||||
|
||||
if (!limit)
|
||||
return 0;
|
||||
|
||||
/// Skip LIMIT WITH TIES by now.
|
||||
if (limit->withTies())
|
||||
return 0;
|
||||
|
||||
const auto * transforming = dynamic_cast<const ITransformingStep *>(child.get());
|
||||
|
||||
/// Skip everything which is not transform.
|
||||
if (!transforming)
|
||||
return 0;
|
||||
|
||||
/// Special cases for sorting steps.
|
||||
if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting()))
|
||||
return 0;
|
||||
|
||||
/// Special case for TotalsHaving. Totals may be incorrect if we push down limit.
|
||||
if (typeid_cast<const TotalsHavingStep *>(child.get()))
|
||||
return 0;
|
||||
|
||||
/// Now we should decide if pushing down limit possible for this step.
|
||||
|
||||
const auto & transform_traits = transforming->getTransformTraits();
|
||||
const auto & data_stream_traits = transforming->getDataStreamTraits();
|
||||
|
||||
/// Cannot push down if child changes the number of rows.
|
||||
if (!transform_traits.preserves_number_of_rows)
|
||||
return 0;
|
||||
|
||||
/// Cannot push down if data was sorted exactly by child stream.
|
||||
if (!child->getOutputStream().sort_description.empty() && !data_stream_traits.preserves_sorting)
|
||||
return 0;
|
||||
|
||||
/// Now we push down limit only if it doesn't change any stream properties.
|
||||
/// TODO: some of them may be changed and, probably, not important for following streams. We may add such info.
|
||||
if (!limit->getOutputStream().hasEqualPropertiesWith(transforming->getOutputStream()))
|
||||
return 0;
|
||||
|
||||
/// Input stream for Limit have changed.
|
||||
limit->updateInputStream(transforming->getInputStreams().front());
|
||||
|
||||
parent.swap(child);
|
||||
return 2;
|
||||
}
|
||||
|
||||
}
|
65
src/Processors/QueryPlan/Optimizations/mergeExpressions.cpp
Normal file
65
src/Processors/QueryPlan/Optimizations/mergeExpressions.cpp
Normal file
@ -0,0 +1,65 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
|
||||
{
|
||||
if (parent_node->children.size() != 1)
|
||||
return false;
|
||||
|
||||
QueryPlan::Node * child_node = parent_node->children.front();
|
||||
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
|
||||
auto * parent_expr = typeid_cast<ExpressionStep *>(parent.get());
|
||||
auto * parent_filter = typeid_cast<FilterStep *>(parent.get());
|
||||
auto * child_expr = typeid_cast<ExpressionStep *>(child.get());
|
||||
|
||||
if (parent_expr && child_expr)
|
||||
{
|
||||
const auto & child_actions = child_expr->getExpression();
|
||||
const auto & parent_actions = parent_expr->getExpression();
|
||||
|
||||
/// We cannot combine actions with arrayJoin and stateful function because we not always can reorder them.
|
||||
/// Example: select rowNumberInBlock() from (select arrayJoin([1, 2]))
|
||||
/// Such a query will return two zeroes if we combine actions together.
|
||||
if (child_actions->hasArrayJoin() && parent_actions->hasStatefulFunctions())
|
||||
return 0;
|
||||
|
||||
auto merged = ActionsDAG::merge(std::move(*child_actions), std::move(*parent_actions));
|
||||
|
||||
auto expr = std::make_unique<ExpressionStep>(child_expr->getInputStreams().front(), merged);
|
||||
expr->setStepDescription("(" + parent_expr->getStepDescription() + " + " + child_expr->getStepDescription() + ")");
|
||||
|
||||
parent_node->step = std::move(expr);
|
||||
parent_node->children.swap(child_node->children);
|
||||
return 1;
|
||||
}
|
||||
else if (parent_filter && child_expr)
|
||||
{
|
||||
const auto & child_actions = child_expr->getExpression();
|
||||
const auto & parent_actions = parent_filter->getExpression();
|
||||
|
||||
if (child_actions->hasArrayJoin() && parent_actions->hasStatefulFunctions())
|
||||
return 0;
|
||||
|
||||
auto merged = ActionsDAG::merge(std::move(*child_actions), std::move(*parent_actions));
|
||||
|
||||
auto filter = std::make_unique<FilterStep>(child_expr->getInputStreams().front(), merged,
|
||||
parent_filter->getFilterColumnName(), parent_filter->removesFilterColumn());
|
||||
filter->setStepDescription("(" + parent_filter->getStepDescription() + " + " + child_expr->getStepDescription() + ")");
|
||||
|
||||
parent_node->step = std::move(filter);
|
||||
parent_node->children.swap(child_node->children);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
75
src/Processors/QueryPlan/Optimizations/optimizeTree.cpp
Normal file
75
src/Processors/QueryPlan/Optimizations/optimizeTree.cpp
Normal file
@ -0,0 +1,75 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <stack>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
void optimizeTree(QueryPlan::Node & root, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
const auto & optimizations = getOptimizations();
|
||||
|
||||
struct Frame
|
||||
{
|
||||
QueryPlan::Node * node;
|
||||
|
||||
/// If not zero, traverse only depth_limit layers of tree (if no other optimizations happen).
|
||||
/// Otherwise, traverse all children.
|
||||
size_t depth_limit = 0;
|
||||
|
||||
/// Next child to process.
|
||||
size_t next_child = 0;
|
||||
};
|
||||
|
||||
std::stack<Frame> stack;
|
||||
stack.push(Frame{.node = &root});
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & frame = stack.top();
|
||||
|
||||
/// If traverse_depth_limit == 0, then traverse without limit (first entrance)
|
||||
/// If traverse_depth_limit > 1, then traverse with (limit - 1)
|
||||
if (frame.depth_limit != 1)
|
||||
{
|
||||
/// Traverse all children first.
|
||||
if (frame.next_child < frame.node->children.size())
|
||||
{
|
||||
stack.push(Frame
|
||||
{
|
||||
.node = frame.node->children[frame.next_child],
|
||||
.depth_limit = frame.depth_limit ? (frame.depth_limit - 1) : 0,
|
||||
});
|
||||
|
||||
++frame.next_child;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
size_t max_update_depth = 0;
|
||||
|
||||
/// Apply all optimizations.
|
||||
for (const auto & optimization : optimizations)
|
||||
{
|
||||
/// Just in case, skip optimization if it is not initialized.
|
||||
if (!optimization.apply)
|
||||
continue;
|
||||
|
||||
/// Try to apply optimization.
|
||||
auto update_depth = optimization.apply(frame.node, nodes);
|
||||
max_update_depth = std::max<size_t>(max_update_depth, update_depth);
|
||||
}
|
||||
|
||||
/// Traverse `max_update_depth` layers of tree again.
|
||||
if (max_update_depth)
|
||||
{
|
||||
frame.depth_limit = max_update_depth;
|
||||
frame.next_child = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Nothing was applied.
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
50
src/Processors/QueryPlan/Optimizations/splitFilter.cpp
Normal file
50
src/Processors/QueryPlan/Optimizations/splitFilter.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
/// Split FilterStep into chain `ExpressionStep -> FilterStep`, where FilterStep contains minimal number of nodes.
|
||||
size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
auto * filter_step = typeid_cast<FilterStep *>(node->step.get());
|
||||
if (!filter_step)
|
||||
return 0;
|
||||
|
||||
const auto & expr = filter_step->getExpression();
|
||||
|
||||
/// Do not split if there are function like runningDifference.
|
||||
if (expr->hasStatefulFunctions())
|
||||
return 0;
|
||||
|
||||
auto split = expr->splitActionsForFilter(filter_step->getFilterColumnName());
|
||||
|
||||
if (split.second->trivial())
|
||||
return 0;
|
||||
|
||||
if (filter_step->removesFilterColumn())
|
||||
split.second->removeUnusedInput(filter_step->getFilterColumnName());
|
||||
|
||||
auto description = filter_step->getStepDescription();
|
||||
|
||||
auto & filter_node = nodes.emplace_back();
|
||||
node->children.swap(filter_node.children);
|
||||
node->children.push_back(&filter_node);
|
||||
|
||||
filter_node.step = std::make_unique<FilterStep>(
|
||||
filter_node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split.first),
|
||||
filter_step->getFilterColumnName(),
|
||||
filter_step->removesFilterColumn());
|
||||
|
||||
node->step = std::make_unique<ExpressionStep>(filter_node.step->getOutputStream(), std::move(split.second));
|
||||
|
||||
filter_node.step->setStepDescription("(" + description + ")[split]");
|
||||
node->step->setStepDescription(description);
|
||||
|
||||
return 2;
|
||||
}
|
||||
|
||||
}
|
@ -6,15 +6,7 @@
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <stack>
|
||||
#include <Processors/QueryPlan/LimitStep.h>
|
||||
#include "MergingSortedStep.h"
|
||||
#include "FinishSortingStep.h"
|
||||
#include "MergeSortingStep.h"
|
||||
#include "PartialSortingStep.h"
|
||||
#include "TotalsHavingStep.h"
|
||||
#include "ExpressionStep.h"
|
||||
#include "ArrayJoinStep.h"
|
||||
#include "FilterStep.h"
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -341,318 +333,9 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
|
||||
}
|
||||
}
|
||||
|
||||
/// If plan looks like Limit -> Sorting, update limit for Sorting
|
||||
bool tryUpdateLimitForSortingSteps(QueryPlan::Node * node, size_t limit)
|
||||
{
|
||||
if (limit == 0)
|
||||
return false;
|
||||
|
||||
QueryPlanStepPtr & step = node->step;
|
||||
QueryPlan::Node * child = nullptr;
|
||||
bool updated = false;
|
||||
|
||||
if (auto * merging_sorted = typeid_cast<MergingSortedStep *>(step.get()))
|
||||
{
|
||||
/// TODO: remove LimitStep here.
|
||||
merging_sorted->updateLimit(limit);
|
||||
updated = true;
|
||||
child = node->children.front();
|
||||
}
|
||||
else if (auto * finish_sorting = typeid_cast<FinishSortingStep *>(step.get()))
|
||||
{
|
||||
/// TODO: remove LimitStep here.
|
||||
finish_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
}
|
||||
else if (auto * merge_sorting = typeid_cast<MergeSortingStep *>(step.get()))
|
||||
{
|
||||
merge_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
child = node->children.front();
|
||||
}
|
||||
else if (auto * partial_sorting = typeid_cast<PartialSortingStep *>(step.get()))
|
||||
{
|
||||
partial_sorting->updateLimit(limit);
|
||||
updated = true;
|
||||
}
|
||||
|
||||
/// We often have chain PartialSorting -> MergeSorting -> MergingSorted
|
||||
/// Try update limit for them also if possible.
|
||||
if (child)
|
||||
tryUpdateLimitForSortingSteps(child, limit);
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
/// Move LimitStep down if possible.
|
||||
static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_node)
|
||||
{
|
||||
auto & child = child_node->step;
|
||||
auto * limit = typeid_cast<LimitStep *>(parent.get());
|
||||
|
||||
if (!limit)
|
||||
return;
|
||||
|
||||
/// Skip LIMIT WITH TIES by now.
|
||||
if (limit->withTies())
|
||||
return;
|
||||
|
||||
const auto * transforming = dynamic_cast<const ITransformingStep *>(child.get());
|
||||
|
||||
/// Skip everything which is not transform.
|
||||
if (!transforming)
|
||||
return;
|
||||
|
||||
/// Special cases for sorting steps.
|
||||
if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting()))
|
||||
return;
|
||||
|
||||
/// Special case for TotalsHaving. Totals may be incorrect if we push down limit.
|
||||
if (typeid_cast<const TotalsHavingStep *>(child.get()))
|
||||
return;
|
||||
|
||||
/// Now we should decide if pushing down limit possible for this step.
|
||||
|
||||
const auto & transform_traits = transforming->getTransformTraits();
|
||||
const auto & data_stream_traits = transforming->getDataStreamTraits();
|
||||
|
||||
/// Cannot push down if child changes the number of rows.
|
||||
if (!transform_traits.preserves_number_of_rows)
|
||||
return;
|
||||
|
||||
/// Cannot push down if data was sorted exactly by child stream.
|
||||
if (!child->getOutputStream().sort_description.empty() && !data_stream_traits.preserves_sorting)
|
||||
return;
|
||||
|
||||
/// Now we push down limit only if it doesn't change any stream properties.
|
||||
/// TODO: some of them may be changed and, probably, not important for following streams. We may add such info.
|
||||
if (!limit->getOutputStream().hasEqualPropertiesWith(transforming->getOutputStream()))
|
||||
return;
|
||||
|
||||
/// Input stream for Limit have changed.
|
||||
limit->updateInputStream(transforming->getInputStreams().front());
|
||||
|
||||
parent.swap(child);
|
||||
}
|
||||
|
||||
/// Move ARRAY JOIN up if possible.
|
||||
static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
auto * expression_step = typeid_cast<ExpressionStep *>(parent.get());
|
||||
auto * filter_step = typeid_cast<FilterStep *>(parent.get());
|
||||
auto * array_join_step = typeid_cast<ArrayJoinStep *>(child.get());
|
||||
|
||||
if (!(expression_step || filter_step) || !array_join_step)
|
||||
return;
|
||||
|
||||
const auto & array_join = array_join_step->arrayJoin();
|
||||
const auto & expression = expression_step ? expression_step->getExpression()
|
||||
: filter_step->getExpression();
|
||||
|
||||
auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns);
|
||||
|
||||
/// No actions can be moved before ARRAY JOIN.
|
||||
if (split_actions.first->empty())
|
||||
return;
|
||||
|
||||
auto description = parent->getStepDescription();
|
||||
|
||||
/// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin.
|
||||
if (split_actions.second->empty())
|
||||
{
|
||||
auto expected_header = parent->getOutputStream().header;
|
||||
|
||||
/// Expression/Filter -> ArrayJoin
|
||||
std::swap(parent, child);
|
||||
/// ArrayJoin -> Expression/Filter
|
||||
|
||||
if (expression_step)
|
||||
child = std::make_unique<ExpressionStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first));
|
||||
else
|
||||
child = std::make_unique<FilterStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first),
|
||||
filter_step->getFilterColumnName(),
|
||||
filter_step->removesFilterColumn());
|
||||
|
||||
child->setStepDescription(std::move(description));
|
||||
|
||||
array_join_step->updateInputStream(child->getOutputStream(), expected_header);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Add new expression step before ARRAY JOIN.
|
||||
/// Expression/Filter -> ArrayJoin -> Something
|
||||
auto & node = nodes.emplace_back();
|
||||
node.children.swap(child_node->children);
|
||||
child_node->children.emplace_back(&node);
|
||||
/// Expression/Filter -> ArrayJoin -> node -> Something
|
||||
|
||||
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions.first));
|
||||
node.step->setStepDescription(description);
|
||||
array_join_step->updateInputStream(node.step->getOutputStream(), {});
|
||||
|
||||
if (expression_step)
|
||||
parent = std::make_unique<ExpressionStep>(array_join_step->getOutputStream(), split_actions.second);
|
||||
else
|
||||
parent = std::make_unique<FilterStep>(array_join_step->getOutputStream(), split_actions.second,
|
||||
filter_step->getFilterColumnName(), filter_step->removesFilterColumn());
|
||||
|
||||
parent->setStepDescription(description + " [split]");
|
||||
}
|
||||
|
||||
/// Replace chain `ExpressionStep -> ExpressionStep` to single ExpressionStep
|
||||
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
|
||||
static bool tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Node * child_node)
|
||||
{
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
|
||||
auto * parent_expr = typeid_cast<ExpressionStep *>(parent.get());
|
||||
auto * parent_filter = typeid_cast<FilterStep *>(parent.get());
|
||||
auto * child_expr = typeid_cast<ExpressionStep *>(child.get());
|
||||
|
||||
if (parent_expr && child_expr)
|
||||
{
|
||||
const auto & child_actions = child_expr->getExpression();
|
||||
const auto & parent_actions = parent_expr->getExpression();
|
||||
|
||||
/// We cannot combine actions with arrayJoin and stateful function because we not always can reorder them.
|
||||
/// Example: select rowNumberInBlock() from (select arrayJoin([1, 2]))
|
||||
/// Such a query will return two zeroes if we combine actions together.
|
||||
if (child_actions->hasArrayJoin() && parent_actions->hasStatefulFunctions())
|
||||
return false;
|
||||
|
||||
auto merged = ActionsDAG::merge(std::move(*child_actions), std::move(*parent_actions));
|
||||
|
||||
auto expr = std::make_unique<ExpressionStep>(child_expr->getInputStreams().front(), merged);
|
||||
expr->setStepDescription("(" + parent_expr->getStepDescription() + " + " + child_expr->getStepDescription() + ")");
|
||||
|
||||
parent_node->step = std::move(expr);
|
||||
parent_node->children.swap(child_node->children);
|
||||
return true;
|
||||
}
|
||||
else if (parent_filter && child_expr)
|
||||
{
|
||||
const auto & child_actions = child_expr->getExpression();
|
||||
const auto & parent_actions = parent_filter->getExpression();
|
||||
|
||||
if (child_actions->hasArrayJoin() && parent_actions->hasStatefulFunctions())
|
||||
return false;
|
||||
|
||||
auto merged = ActionsDAG::merge(std::move(*child_actions), std::move(*parent_actions));
|
||||
|
||||
auto filter = std::make_unique<FilterStep>(child_expr->getInputStreams().front(), merged,
|
||||
parent_filter->getFilterColumnName(), parent_filter->removesFilterColumn());
|
||||
filter->setStepDescription("(" + parent_filter->getStepDescription() + " + " + child_expr->getStepDescription() + ")");
|
||||
|
||||
parent_node->step = std::move(filter);
|
||||
parent_node->children.swap(child_node->children);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Split FilterStep into chain `ExpressionStep -> FilterStep`, where FilterStep contains minimal number of nodes.
|
||||
static bool trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
auto * filter_step = typeid_cast<FilterStep *>(node->step.get());
|
||||
if (!filter_step)
|
||||
return false;
|
||||
|
||||
const auto & expr = filter_step->getExpression();
|
||||
|
||||
/// Do not split if there are function like runningDifference.
|
||||
if (expr->hasStatefulFunctions())
|
||||
return false;
|
||||
|
||||
auto split = expr->splitActionsForFilter(filter_step->getFilterColumnName());
|
||||
|
||||
if (split.second->empty())
|
||||
return false;
|
||||
|
||||
if (filter_step->removesFilterColumn())
|
||||
split.second->removeUnusedInput(filter_step->getFilterColumnName());
|
||||
|
||||
auto description = filter_step->getStepDescription();
|
||||
|
||||
auto & filter_node = nodes.emplace_back();
|
||||
node->children.swap(filter_node.children);
|
||||
node->children.push_back(&filter_node);
|
||||
|
||||
filter_node.step = std::make_unique<FilterStep>(
|
||||
filter_node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split.first),
|
||||
filter_step->getFilterColumnName(),
|
||||
filter_step->removesFilterColumn());
|
||||
|
||||
node->step = std::make_unique<ExpressionStep>(filter_node.step->getOutputStream(), std::move(split.second));
|
||||
|
||||
filter_node.step->setStepDescription("(" + description + ")[split]");
|
||||
node->step->setStepDescription(description);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void QueryPlan::optimize()
|
||||
{
|
||||
/* Stack contains info for every nodes in the path from tree root to the current node.
|
||||
* Every optimization changes only current node and it's children.
|
||||
* Optimization may change QueryPlanStep, but not QueryPlan::Node (only add a new one).
|
||||
* So, QueryPlan::Node::children will be always valid.
|
||||
*/
|
||||
|
||||
struct Frame
|
||||
{
|
||||
Node * node;
|
||||
size_t next_child = 0;
|
||||
};
|
||||
|
||||
std::stack<Frame> stack;
|
||||
stack.push(Frame{.node = root});
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & frame = stack.top();
|
||||
|
||||
if (frame.next_child == 0)
|
||||
{
|
||||
if (frame.node->children.size() == 1)
|
||||
{
|
||||
tryPushDownLimit(frame.node->step, frame.node->children.front());
|
||||
|
||||
while (tryMergeExpressions(frame.node, frame.node->children.front()));
|
||||
|
||||
if (frame.node->children.size() == 1)
|
||||
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
|
||||
|
||||
trySplitFilter(frame.node, nodes);
|
||||
}
|
||||
}
|
||||
|
||||
if (frame.next_child < frame.node->children.size())
|
||||
{
|
||||
stack.push(Frame{frame.node->children[frame.next_child]});
|
||||
++frame.next_child;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (frame.node->children.size() == 1)
|
||||
{
|
||||
while (tryMergeExpressions(frame.node, frame.node->children.front()));
|
||||
|
||||
trySplitFilter(frame.node, nodes);
|
||||
|
||||
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
|
||||
}
|
||||
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
QueryPlanOptimizations::optimizeTree(*root, nodes);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -116,6 +116,11 @@ SRCS(
|
||||
QueryPlan/MergingFinal.cpp
|
||||
QueryPlan/MergingSortedStep.cpp
|
||||
QueryPlan/OffsetStep.cpp
|
||||
QueryPlan/Optimizations/liftUpArrayJoin.cpp
|
||||
QueryPlan/Optimizations/limitPushDown.cpp
|
||||
QueryPlan/Optimizations/mergeExpressions.cpp
|
||||
QueryPlan/Optimizations/optimizeTree.cpp
|
||||
QueryPlan/Optimizations/splitFilter.cpp
|
||||
QueryPlan/PartialSortingStep.cpp
|
||||
QueryPlan/QueryPlan.cpp
|
||||
QueryPlan/ReadFromPreparedSource.cpp
|
||||
|
Loading…
Reference in New Issue
Block a user