Split filter optimization.

This commit is contained in:
Nikolai Kochetov 2021-01-19 13:03:25 +03:00
parent 6e1a118642
commit b00f01d6b1
3 changed files with 94 additions and 10 deletions

View File

@ -454,6 +454,36 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
return false;
}
void ActionsDAG::removeUnusedInput(const std::string & column_name)
{
auto it = inputs.begin();
for (; it != inputs.end(); ++it)
if ((*it)->result_name == column_name)
break;
if (it == inputs.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found input {} in ActionsDAG\n{}", column_name, dumpDAG());
auto * input = *it;
for (const auto & node : nodes)
for (const auto * child : node.children)
if (input == child)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot remove input {} because it has dependent nodes in ActionsDAG\n{}",
column_name, dumpDAG());
for (auto jt = index.begin(); jt != index.end(); ++jt)
{
if (*jt == input)
{
index.remove(jt);
break;
}
}
inputs.erase(it);
}
ActionsDAGPtr ActionsDAG::clone() const
{
auto actions = cloneEmpty();
@ -1067,4 +1097,16 @@ std::pair<ActionsDAGPtr, ActionsDAGPtr> ActionsDAG::splitActionsBeforeArrayJoin
return res;
}
std::pair<ActionsDAGPtr, ActionsDAGPtr> ActionsDAG::splitActionsForFilter(const std::string & column_name) const
{
auto it = index.find(column_name);
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name, dumpDAG());
std::unordered_set<const Node *> split_nodes = {*it};
return split(split_nodes);
}
}

View File

@ -214,13 +214,13 @@ public:
/// If column is not in index, try to find it in nodes and insert back into index.
bool tryRestoreColumn(const std::string & column_name);
/// Find column in input. Remove it from input and index.
/// Checks that column in inputs and has not dependent nodes.
void removeUnusedInput(const std::string & column_name);
void projectInput() { settings.project_input = true; }
void removeUnusedActions(const Names & required_names);
/// Splits actions into two parts. Returned first half may be swapped with ARRAY JOIN.
std::pair<ActionsDAGPtr, ActionsDAGPtr> splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
bool hasArrayJoin() const;
bool hasStatefulFunctions() const;
bool empty() const; /// If actions only contain inputs.
@ -249,14 +249,25 @@ public:
MatchColumnsMode mode,
bool ignore_constant_values = false); /// Do not check that constants are same. Use value from result_header.
/// Create ActionsDAG which represents expression equivalent to applying lhs and rhs actions consequently.
/// Create ActionsDAG which represents expression equivalent to applying first and second actions consequently.
/// Is used to replace `(first -> second)` expression chain to single `merge(first, second)` expression.
/// If first.settings.project_input is set, then outputs of `first` must include inputs of `second`.
/// Otherwise, any two actions may be combined.
static ActionsDAGPtr merge(ActionsDAG && first, ActionsDAG && second);
/// Split ActionsDAG into two DAGs, where first part contains all nodes from split_nodes and their children.
/// Execution of first then second parts on block is equivalent to execution of initial DAG.
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal index (outputs).
/// Second DAG inputs may contain less inputs then first DAG (but also include other columns).
std::pair<ActionsDAGPtr, ActionsDAGPtr> split(std::unordered_set<const Node *> split_nodes) const;
/// Splits actions into two parts. Returned first half may be swapped with ARRAY JOIN.
std::pair<ActionsDAGPtr, ActionsDAGPtr> splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
/// Splits actions into two parts. First part has minimal size sufficient for calculation of column_name.
/// Index of initial actions must contain column_name.
std::pair<ActionsDAGPtr, ActionsDAGPtr> splitActionsForFilter(const std::string & column_name) const;
private:
Node & addNode(Node node, bool can_replace = false);
Node & getNode(const std::string & name);

View File

@ -497,12 +497,13 @@ static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node *
filter_step->getFilterColumnName(), filter_step->removesFilterColumn());
}
/// Replace chain `ExpressionStep -> ExpressionStep` to single ExpressionStep
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
static bool tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Node * child_node)
{
auto & parent = parent_node->step;
auto & child = child_node->step;
/// TODO: FilterStep
auto * parent_expr = typeid_cast<ExpressionStep *>(parent.get());
auto * parent_filter = typeid_cast<FilterStep *>(parent.get());
auto * child_expr = typeid_cast<ExpressionStep *>(child.get());
@ -549,6 +550,36 @@ static bool tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Node *
return false;
}
/// Split FilterStep into chain `ExpressionStep -> FilterStep`, where FilterStep contains minimal number of nodes.
static bool trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes)
{
auto * filter_step = typeid_cast<FilterStep *>(node->step.get());
if (!filter_step)
return false;
const auto & expr = filter_step->getExpression();
auto split = expr->splitActionsForFilter(filter_step->getFilterColumnName());
if (split.second->empty())
return false;
if (filter_step->removesFilterColumn())
split.second->removeUnusedInput(filter_step->getFilterColumnName());
auto & filter_node = nodes.emplace_back();
node->children.swap(filter_node.children);
node->children.push_back(&filter_node);
filter_node.step = std::make_unique<FilterStep>(
filter_node.children.at(0)->step->getOutputStream(),
std::move(split.first),
filter_step->getFilterColumnName(),
filter_step->removesFilterColumn());
node->step = std::make_unique<ExpressionStep>(filter_node.step->getOutputStream(), std::move(split.second));
return true;
}
void QueryPlan::optimize()
{
struct Frame
@ -566,12 +597,16 @@ void QueryPlan::optimize()
if (frame.next_child == 0)
{
/// First entrance, try push down.
if (frame.node->children.size() == 1)
{
tryPushDownLimit(frame.node->step, frame.node->children.front());
while (tryMergeExpressions(frame.node, frame.node->children.front()));
if (frame.node->children.size() == 1)
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
trySplitFilter(frame.node, nodes);
}
}
@ -582,10 +617,6 @@ void QueryPlan::optimize()
}
else
{
/// Last entrance, try lift up.
if (frame.node->children.size() == 1)
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
stack.pop();
}
}