Fix filter push down columns order.

This commit is contained in:
Nikolai Kochetov 2021-03-11 13:34:15 +03:00
parent 2303f1788c
commit 738cb1af62
3 changed files with 27 additions and 10 deletions

View File

@ -1360,7 +1360,8 @@ ColumnsWithTypeAndName prepareFunctionArguments(const std::vector<ActionsDAG::No
///
/// Result actions add single column with conjunction result (it is always last in index).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction)
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction, const ColumnsWithTypeAndName & all_inputs)
{
if (conjunction.empty())
return nullptr;
@ -1374,6 +1375,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
std::make_shared<FunctionAnd>()));
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> nodes_mapping;
std::unordered_map<std::string, std::list<Node *>> added_inputs;
struct Frame
{
@ -1416,16 +1418,30 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
child = nodes_mapping[child];
if (node.type == ActionType::INPUT)
{
actions->inputs.emplace_back(&node);
actions->index.insert(&node);
}
added_inputs[node.result_name].push_back(&node);
stack.pop();
}
}
}
for (const auto & col : all_inputs)
{
Node * input;
auto & list = added_inputs[col.name];
if (list.empty())
input = &const_cast<Node &>(actions->addInput(col));
else
{
input = list.front();
list.pop_front();
actions->inputs.push_back(input);
}
actions->index.insert(input);
}
Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
@ -1442,7 +1458,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
return actions;
}
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs)
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs, const ColumnsWithTypeAndName & all_inputs)
{
Node * predicate;
@ -1480,7 +1496,7 @@ ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name,
}
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
auto actions = cloneActionsForConjunction(conjunction.allowed);
auto actions = cloneActionsForConjunction(conjunction.allowed, all_inputs);
if (!actions)
return nullptr;

View File

@ -292,7 +292,7 @@ public:
/// Otherwise, return actions which inputs are from available_inputs.
/// Returned actions add single column which may be used for filter.
/// Also, replace some nodes of current inputs to constant 1 in case they are filtered.
ActionsDAGPtr splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs);
ActionsDAGPtr splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs, const ColumnsWithTypeAndName & all_inputs);
private:
Node & addNode(Node node, bool can_replace = false, bool add_to_index = true);
@ -323,7 +323,7 @@ private:
void compileFunctions();
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction);
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction, const ColumnsWithTypeAndName & all_inputs);
};

View File

@ -43,7 +43,8 @@ static size_t tryAddNewFilterStep(
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs);
const auto & all_inputs = child->getInputStreams().front().header.getColumnsWithTypeAndName();
auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs, all_inputs);
if (!split_filter)
return 0;