Rewrite tryPushDownFilter for join with lambda

This commit is contained in:
vdimir 2022-07-13 12:06:29 +00:00
parent 549a85fee9
commit 4124dc9ac4
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862

View File

@ -1,3 +1,10 @@
#include <Columns/IColumn.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
@ -11,13 +18,10 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
namespace DB::ErrorCodes
{
@ -216,43 +220,52 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
const auto & table_join = join->getJoin()->getTableJoin();
std::vector<ASTTableJoin::Kind> kinds = {ASTTableJoin::Kind::Left};
/// For not full sorting merge join push down is for left table only, because left and right streams are not independent.
/// Only inner and left(/right) join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (join->allowPushDownToRight())
kinds.emplace_back(ASTTableJoin::Kind::Right);
for (const auto kind : kinds)
auto join_push_down = [&](ASTTableJoin::Kind kind) -> size_t
{
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == kind)
const auto & table_join = join->getJoin()->getTableJoin();
/// Only inner and left(/right) join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() != ASTTableJoin::Kind::Inner && table_join.kind() != kind)
return 0;
bool is_left = kind == ASTTableJoin::Kind::Left;
const auto & input_header = is_left ? join->getInputStreams().front().header : join->getInputStreams().back().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
const auto & source_columns = input_header.getNames();
for (const auto & name : source_columns)
{
const auto & streams = join->getInputStreams();
const auto & input_header = kind == ASTTableJoin::Kind::Left ? streams.front().header : streams.back().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
const auto & source_columns = input_header.getNames();
for (const auto & name : source_columns)
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!input_header.has(name) || !res_header.has(name))
continue;
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!input_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!input_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!input_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
}
const bool can_remove_filter
= std::find(source_columns.begin(), source_columns.end(), filter->getFilterColumnName()) == source_columns.end();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter, kind == ASTTableJoin::Kind::Left ? 0 : 1))
return updated_steps;
allowed_keys.push_back(name);
}
const bool can_remove_filter
= std::find(source_columns.begin(), source_columns.end(), filter->getFilterColumnName()) == source_columns.end();
size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter, is_left ? 0 : 1);
if (updated_steps > 0)
{
LOG_DEBUG(&Poco::Logger::get("tryPushDownFilter"), "Pushed down filter to {} side of join", kind);
}
return updated_steps;
};
if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Left))
return updated_steps;
/// For full sorting merge join we push down both to the left and right tables, because left and right streams are not independent.
if (join->allowPushDownToRight())
{
if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Right))
return updated_steps;
}
}