Merge pull request #39575 from vdimir/join_pushdown_column_not_found

Fix column not found for push down with join
This commit is contained in:
Vladimir C 2022-08-08 14:04:42 +02:00 committed by GitHub
commit 6bd4821656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 21 deletions

View File

@ -42,37 +42,50 @@ static bool filterColumnIsNotAmongAggregatesArguments(const AggregateDescription
return true;
}
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true, size_t child_idx = 0)
/// Assert that `node->children` has at least `child_num` elements
static void checkChildrenSize(QueryPlan::Node * node, size_t child_num)
{
auto & child = node->step;
if (child_num > child->getInputStreams().size() || child_num > node->children.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong number of children: expected at least {}, got {} children and {} streams",
child_num, child->getInputStreams().size(), node->children.size());
}
static ActionsDAGPtr splitFilter(QueryPlan::Node * parent_node, const Names & allowed_inputs, size_t child_idx = 0)
{
QueryPlan::Node * child_node = parent_node->children.front();
checkChildrenSize(child_node, child_idx + 1);
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * filter = static_cast<FilterStep *>(parent.get());
auto * filter = assert_cast<FilterStep *>(parent.get());
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
bool removes_filter = filter->removesFilterColumn();
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
if (child_idx >= child->getInputStreams().size() || child_idx >= child_node->children.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Child index {} is out of range (streams: {}, children: {})",
child_idx, child->getInputStreams().size(), child_node->children.size());
const auto & all_inputs = child->getInputStreams()[child_idx].header.getColumnsWithTypeAndName();
auto split_filter = expression->cloneActionsForFilterPushDown(filter_column_name, removes_filter, allowed_inputs, all_inputs);
if (!split_filter)
return 0;
return split_filter;
}
// std::cerr << "===============\n" << expression->dumpDAG() << std::endl;
// std::cerr << "---------------\n" << split_filter->dumpDAG() << std::endl;
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const ActionsDAGPtr & split_filter,
bool can_remove_filter = true, size_t child_idx = 0)
{
QueryPlan::Node * child_node = parent_node->children.front();
checkChildrenSize(child_node, child_idx + 1);
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * filter = assert_cast<FilterStep *>(parent.get());
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
const auto * filter_node = expression->tryFindInIndex(filter_column_name);
if (!filter_node && !removes_filter)
if (!filter_node && !filter->removesFilterColumn())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
filter_column_name, expression->dumpDAG());
@ -89,9 +102,9 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.
auto split_filter_column_name = (*split_filter->getIndex().begin())->result_name;
String split_filter_column_name = split_filter->getIndex().front()->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(), std::move(split_filter), std::move(split_filter_column_name), can_remove_filter);
node.children.at(0)->step->getOutputStream(), split_filter, std::move(split_filter_column_name), can_remove_filter);
if (auto * transforming_step = dynamic_cast<ITransformingStep *>(child.get()))
{
@ -118,6 +131,15 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
return 3;
}
static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true)
{
if (auto split_filter = splitFilter(parent_node, allowed_inputs, 0))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, 0);
return 0;
}
size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
@ -248,12 +270,26 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
allowed_keys.push_back(name);
}
const bool can_remove_filter
= std::find(source_columns.begin(), source_columns.end(), filter->getFilterColumnName()) == source_columns.end();
size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter, is_left ? 0 : 1);
/// For left JOIN, push down to the first child; for right - to the second one.
const auto child_idx = is_left ? 0 : 1;
ActionsDAGPtr split_filter = splitFilter(parent_node, allowed_keys, child_idx);
if (!split_filter)
return 0;
/*
* We should check the presence of a split filter column name in `source_columns` to avoid removing the required column.
*
* Example:
* A filter expression is `a AND b = c`, but `b` and `c` belong to another side of the join and not in `allowed_keys`, so the final split filter is just `a`.
* In this case `a` can be in `source_columns` but not `and(a, equals(b, c))`.
*
* New filter column is the first one.
*/
const String & split_filter_column_name = split_filter->getIndex().front()->result_name;
bool can_remove_filter = source_columns.end() == std::find(source_columns.begin(), source_columns.end(), split_filter_column_name);
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
if (updated_steps > 0)
{
LOG_DEBUG(&Poco::Logger::get("tryPushDownFilter"), "Pushed down filter to {} side of join", kind);
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter to {} side of join", kind);
}
return updated_steps;
};

View File

@ -0,0 +1,4 @@
1
1
1
1

View File

@ -0,0 +1,16 @@
{% for join_algorithm in ['default', 'full_sorting_merge', 'hash', 'partial_merge'] -%}
SET join_algorithm = '{{ join_algorithm }}';
SELECT deleted
FROM (
SELECT 1 AS deleted, 'k' AS a, 'v' AS b
) AS q
INNER JOIN (
SELECT 'k' AS a, 'v' AS c
) AS s
ON q.a = s.a
WHERE deleted AND (b = c);
{% endfor -%}