Filter push down for LEFT/INNER JOIN, CreatingSets, Union.

This commit is contained in:
Nikolai Kochetov 2021-03-25 21:11:54 +03:00
parent 3efd11becc
commit e9f7213858
11 changed files with 185 additions and 4 deletions

2
contrib/libpq vendored

@ -1 +1 @@
Subproject commit c7624588ddd84f153dd5990e81b886e4568bddde Subproject commit 1f9c286dba60809edb64e384d6727d80d269b6cf

View File

@ -134,6 +134,8 @@ class HashJoin : public IJoin
public: public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false); HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
const TableJoin & getTableJoin() const override { return *table_join; }
/** Add block of data from right hand of JOIN to the map. /** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data. * Returns false, if some limit was exceeded and you should not insert more data.
*/ */

View File

@ -14,11 +14,15 @@ class Block;
struct ExtraBlock; struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>; using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
class TableJoin;
class IJoin class IJoin
{ {
public: public:
virtual ~IJoin() = default; virtual ~IJoin() = default;
virtual const TableJoin & getTableJoin() const = 0;
/// Add block of data from right hand of JOIN. /// Add block of data from right hand of JOIN.
/// @returns false, if some limit was exceeded and you should not insert more data. /// @returns false, if some limit was exceeded and you should not insert more data.
virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0; virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0;

View File

@ -19,6 +19,8 @@ class JoinSwitcher : public IJoin
public: public:
JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_); JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_);
const TableJoin & getTableJoin() const override { return *table_join; }
/// Add block of data from right hand of JOIN into current join object. /// Add block of data from right hand of JOIN into current join object.
/// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it. /// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it.
/// @returns false, if join-on-disk disk limit exceeded /// @returns false, if join-on-disk disk limit exceeded

View File

@ -23,6 +23,7 @@ class MergeJoin : public IJoin
public: public:
MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block); MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
const TableJoin & getTableJoin() const override { return *table_join; }
bool addJoinedBlock(const Block & block, bool check_limits) override; bool addJoinedBlock(const Block & block, bool check_limits) override;
void joinBlock(Block &, ExtraBlockPtr & not_processed) override; void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
void joinTotals(Block &) const override; void joinTotals(Block &) const override;

View File

@ -45,6 +45,8 @@ public:
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override; void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
const JoinPtr & getJoin() const { return join; }
private: private:
JoinPtr join; JoinPtr join;
bool has_non_joined_rows; bool has_non_joined_rows;

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/AggregatingStep.h> #include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h> #include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h> #include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h> #include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h> #include <Processors/QueryPlan/MergeSortingStep.h>
@ -11,8 +12,10 @@
#include <Processors/QueryPlan/PartialSortingStep.h> #include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h> #include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h> #include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Interpreters/ActionsDAG.h> #include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h> #include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
@ -82,7 +85,7 @@ static size_t tryAddNewFilterStep(
return 3; return 3;
} }
static Names getAggregatinKeys(const Aggregator::Params & params) static Names getAggregatingKeys(const Aggregator::Params & params)
{ {
Names keys; Names keys;
keys.reserve(params.keys.size()); keys.reserve(params.keys.size());
@ -112,17 +115,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get())) if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
{ {
const auto & params = aggregating->getParams(); const auto & params = aggregating->getParams();
Names keys = getAggregatinKeys(params); Names keys = getAggregatingKeys(params);
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys)) if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys))
return updated_steps; return updated_steps;
} }
if (typeid_cast<CreatingSetsStep *>(child.get()))
{
/// CreatingSets does not change header.
/// We can push down filter and update header.
/// - Something
/// Filter - CreatingSets - CreatingSet
/// - CreatingSet
auto input_streams = child->getInputStreams();
input_streams.front() = filter->getOutputStream();
child = std::make_unique<CreatingSetsStep>(input_streams);
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// CreatingSets - CreatingSet
/// - CreatingSet
return 2;
}
if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get())) if (auto * totals_having = typeid_cast<TotalsHavingStep *>(child.get()))
{ {
/// If totals step has HAVING expression, skip it for now. /// If totals step has HAVING expression, skip it for now.
/// TODO: /// TODO:
/// We can merge HAING expression with current filer. /// We can merge HAVING expression with current filer.
/// Also, we can push down part of HAVING which depend only on aggregation keys. /// Also, we can push down part of HAVING which depend only on aggregation keys.
if (totals_having->getActions()) if (totals_having->getActions())
return 0; return 0;
@ -168,6 +190,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps; return updated_steps;
} }
if (auto * join = typeid_cast<JoinStep *>(child.get()))
{
const auto & table_join = join->getJoin()->getTableJoin();
/// Push down is for left table only. We need to update JoinStep for push down into right.
/// Only inner and left join are supported. Other types may generate default values for left table keys.
/// So, if we push down a condition like `key != 0`, not all rows may be filtered.
if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
{
const auto & left_header = join->getInputStreams().front().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
for (const auto & name : table_join.keyNamesLeft())
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
if (!left_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
}
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys))
return updated_steps;
}
}
/// TODO. /// TODO.
/// We can filter earlier if expression does not depend on WITH FILL columns. /// We can filter earlier if expression does not depend on WITH FILL columns.
/// But we cannot just push down condition, because other column may be filled with defaults. /// But we cannot just push down condition, because other column may be filled with defaults.
@ -193,6 +245,48 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps; return updated_steps;
} }
if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{
/// Union does not change header.
/// We can push down filter and update header.
auto union_input_streams = child->getInputStreams();
for (auto & input_stream : union_input_streams)
input_stream.header = filter->getOutputStream().header;
/// - Something
/// Filter - Union - Something
/// - Something
child = std::make_unique<UnionStep>(union_input_streams, union_step->getMaxThreads());
std::swap(parent, child);
std::swap(parent_node->children, child_node->children);
std::swap(parent_node->children.front(), child_node->children.front());
/// - Filter - Something
/// Union - Something
/// - Something
for (size_t i = 1; i < parent_node->children.size(); ++i)
{
auto & filter_node = nodes.emplace_back();
filter_node.children.push_back(parent_node->children[i]);
parent_node->children[i] = &filter_node;
filter_node.step = std::make_unique<FilterStep>(
filter_node.children.front()->step->getOutputStream(),
filter->getExpression()->clone(),
filter->getFilterColumnName(),
filter->removesFilterColumn());
}
/// - Filter - Something
/// Union - Filter - Something
/// - Filter - Something
return 3;
}
return 0; return 0;
} }

View File

@ -6,6 +6,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static Block checkHeaders(const DataStreams & input_streams) static Block checkHeaders(const DataStreams & input_streams)
{ {
if (input_streams.empty()) if (input_streams.empty())

View File

@ -17,6 +17,8 @@ public:
void describePipeline(FormatSettings & settings) const override; void describePipeline(FormatSettings & settings) const override;
size_t getMaxThreads() const { return max_threads; }
private: private:
Block header; Block header;
size_t max_threads; size_t max_threads;

View File

@ -123,3 +123,26 @@ Filter column: notEquals(y, 2)
3 10 3 10
0 37 0 37
> filter is pushed down before CreatingSets
CreatingSets
Filter
Filter
1
3
> one condition of filter is pushed down before LEFT JOIN
Join
Filter column: notEquals(number, 1)
Join
0 0
3 3
> one condition of filter is pushed down before INNER JOIN
Join
Filter column: notEquals(number, 1)
Join
3 3
> filter is pushed down before UNION
Union
Filter
Filter
2 3
2 3

View File

@ -150,3 +150,49 @@ $CLICKHOUSE_CLIENT -q "
select * from ( select * from (
select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals
) where y != 2" ) where y != 2"
echo "> filter is pushed down before CreatingSets"
$CLICKHOUSE_CLIENT -q "
explain select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0" |
grep -o "CreatingSets\|Filter"
$CLICKHOUSE_CLIENT -q "
select number from (
select number from numbers(5) where number in (select 1 + number from numbers(3))
) where number != 2 settings enable_optimize_predicate_expression=0"
echo "> one condition of filter is pushed down before LEFT JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any left join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> one condition of filter is pushed down before INNER JOIN"
$CLICKHOUSE_CLIENT -q "
explain actions = 1
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" |
grep -o "Join\|Filter column: notEquals(number, 1)"
$CLICKHOUSE_CLIENT -q "
select number as a, r.b from numbers(4) as l any inner join (
select number + 2 as b from numbers(3)
) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0"
echo "> filter is pushed down before UNION"
$CLICKHOUSE_CLIENT -q "
explain select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0" |
grep -o "Union\|Filter"
$CLICKHOUSE_CLIENT -q "
select a, b from (
select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2)
) where a != 1 settings enable_optimize_predicate_expression = 0"