Merge pull request #21841 from ClickHouse/fix-21773

Fix wrong column order after  filter push down.
This commit is contained in:
Nikolai Kochetov 2021-03-18 13:30:22 +03:00 committed by GitHub
commit 03cb0299a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 15 deletions

View File

@ -91,7 +91,7 @@ const ActionsDAG::Node & ActionsDAG::addInput(std::string name, DataTypePtr type
return addNode(std::move(node), can_replace, add_to_index);
}
const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool can_replace)
const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool can_replace, bool add_to_index)
{
Node node;
node.type = ActionType::INPUT;
@ -99,7 +99,7 @@ const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool
node.result_name = std::move(column.name);
node.column = std::move(column.column);
return addNode(std::move(node), can_replace);
return addNode(std::move(node), can_replace, add_to_index);
}
const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace, bool materialize)
@ -1360,7 +1360,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const std::vector<ActionsDAG::No
///
/// Result actions add single column with conjunction result (it is always last in index).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction)
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunction, const ColumnsWithTypeAndName & all_inputs)
{
if (conjunction.empty())
return nullptr;
@ -1374,6 +1374,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
std::make_shared<FunctionAnd>()));
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> nodes_mapping;
std::unordered_map<std::string, std::list<Node *>> required_inputs;
struct Frame
{
@ -1416,16 +1417,31 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
child = nodes_mapping[child];
if (node.type == ActionType::INPUT)
{
actions->inputs.emplace_back(&node);
actions->index.insert(&node);
}
required_inputs[node.result_name].push_back(&node);
stack.pop();
}
}
}
/// Actions must have the same inputs as in all_inputs list.
/// See comment to cloneActionsForFilterPushDown.
for (const auto & col : all_inputs)
{
Node * input;
auto & list = required_inputs[col.name];
if (list.empty())
input = &const_cast<Node &>(actions->addInput(col, true, false));
else
{
input = list.front();
list.pop_front();
actions->inputs.push_back(input);
}
actions->index.insert(input);
}
Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
@ -1442,7 +1458,11 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(std::vector<Node *> conjunc
return actions;
}
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs)
ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
const std::string & filter_name,
bool can_remove_filter,
const Names & available_inputs,
const ColumnsWithTypeAndName & all_inputs)
{
Node * predicate;
@ -1480,7 +1500,7 @@ ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name,
}
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
auto actions = cloneActionsForConjunction(conjunction.allowed);
auto actions = cloneActionsForConjunction(conjunction.allowed, all_inputs);
if (!actions)
return nullptr;

View File

@ -120,8 +120,31 @@ public:
/// Insert method doesn't check if map already have node with the same name.
/// If node with the same name exists, it is removed from map, but not list.
/// It is expected and used for project(), when result may have several columns with the same name.
void insert(Node * node) { map[node->result_name] = list.emplace(list.end(), node); }
void prepend(Node * node) { map[node->result_name] = list.emplace(list.begin(), node); }
void insert(Node * node)
{
auto it = list.emplace(list.end(), node);
if (auto handle = map.extract(node->result_name))
{
handle.key() = node->result_name; /// Change string_view
handle.mapped() = it;
map.insert(std::move(handle));
}
else
map[node->result_name] = it;
}
void prepend(Node * node)
{
auto it = list.emplace(list.begin(), node);
if (auto handle = map.extract(node->result_name))
{
handle.key() = node->result_name; /// Change string_view
handle.mapped() = it;
map.insert(std::move(handle));
}
else
map[node->result_name] = it;
}
/// If node with same name exists in index, replace it. Otherwise insert new node to index.
void replace(Node * node)
@ -200,7 +223,7 @@ public:
std::string dumpDAG() const;
const Node & addInput(std::string name, DataTypePtr type, bool can_replace = false, bool add_to_index = true);
const Node & addInput(ColumnWithTypeAndName column, bool can_replace = false);
const Node & addInput(ColumnWithTypeAndName column, bool can_replace = false, bool add_to_index = true);
const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false, bool materialize = false);
const Node & addAlias(const std::string & name, std::string alias, bool can_replace = false);
const Node & addArrayJoin(const std::string & source_name, std::string result_name);
@ -292,7 +315,23 @@ public:
/// Otherwise, return actions which inputs are from available_inputs.
/// Returned actions add single column which may be used for filter.
/// Also, replace some nodes of current inputs to constant 1 in case they are filtered.
ActionsDAGPtr splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs);
///
/// @param all_inputs should contain inputs from previous step, which will be used for result actions.
/// It is expected that all_inputs contain columns from available_inputs.
/// This parameter is needed to enforce result actions save columns order in block.
/// Otherwise for some queries, e.g. with GROUP BY, columns will be mixed.
/// Example: SELECT sum(x), y, z FROM tab WHERE z > 0 and sum(x) > 0
/// Pushed condition: z > 0
/// GROUP BY step will transform columns `x, y, z` -> `sum(x), y, z`
/// If we just add filter step with actions `z -> z > 0` before GROUP BY,
/// columns will be transformed like `x, y, z` -> `z, z > 0, x, y` -(remove filter)-> `z, x, y`.
/// To avoid it, add inputs from `all_inputs` list,
/// so actions `x, y, z -> x, y, z, z > 0` -(remove filter)-> `x, y, z` will not change columns order.
ActionsDAGPtr cloneActionsForFilterPushDown(
const std::string & filter_name,
bool can_remove_filter,
const Names & available_inputs,
const ColumnsWithTypeAndName & all_inputs);
private:
Node & addNode(Node node, bool can_replace = false, bool add_to_index = true);
@ -323,7 +362,7 @@ private:
void compileFunctions();
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction);
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction, const ColumnsWithTypeAndName & all_inputs);
};

View File

@ -43,7 +43,8 @@ static size_t tryAddNewFilterStep(
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs);
const auto & all_inputs = child->getInputStreams().front().header.getColumnsWithTypeAndName();
auto split_filter = expression->cloneActionsForFilterPushDown(filter_column_name, removes_filter, allowed_inputs, all_inputs);
if (!split_filter)
return 0;

View File

@ -0,0 +1,6 @@
1 2
1 2
[1] 2
[[1]] 2
String1_0 String2_0 String3_0 String4_0 1
String1_0 String2_0 String3_0 String4_0 1

View File

@ -0,0 +1,37 @@
SELECT * FROM (SELECT col1, col2 FROM (select '1' as col1, '2' as col2) GROUP by col1, col2) AS expr_qry WHERE col2 != '';
SELECT * FROM (SELECT materialize('1') AS s1, materialize('2') AS s2 GROUP BY s1, s2) WHERE s2 = '2';
SELECT * FROM (SELECT materialize([1]) AS s1, materialize('2') AS s2 GROUP BY s1, s2) WHERE s2 = '2';
SELECT * FROM (SELECT materialize([[1]]) AS s1, materialize('2') AS s2 GROUP BY s1, s2) WHERE s2 = '2';
DROP TABLE IF EXISTS Test;
CREATE TABLE Test
ENGINE = MergeTree()
PRIMARY KEY (String1,String2)
ORDER BY (String1,String2)
AS
SELECT
'String1_' || toString(number) as String1,
'String2_' || toString(number) as String2,
'String3_' || toString(number) as String3,
'String4_' || toString(number%4) as String4
FROM numbers(1);
SELECT *
FROM
(
SELECT String1,String2,String3,String4,COUNT(*)
FROM Test
GROUP by String1,String2,String3,String4
) AS expr_qry;
SELECT *
FROM
(
SELECT String1,String2,String3,String4,COUNT(*)
FROM Test
GROUP by String1,String2,String3,String4
) AS expr_qry
WHERE String4 ='String4_0';
DROP TABLE IF EXISTS Test;