From e9f72138589c0bb1d6aaf413c22a90a71c5135aa Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 25 Mar 2021 21:11:54 +0300 Subject: [PATCH] Filter push down for LEFT/INNER JOIN, CreatingSets, Union. --- contrib/libpq | 2 +- src/Interpreters/HashJoin.h | 2 + src/Interpreters/IJoin.h | 4 + src/Interpreters/JoinSwitcher.h | 2 + src/Interpreters/MergeJoin.h | 1 + src/Processors/QueryPlan/ExpressionStep.h | 2 + .../Optimizations/filterPushDown.cpp | 100 +++++++++++++++++- src/Processors/QueryPlan/UnionStep.cpp | 5 + src/Processors/QueryPlan/UnionStep.h | 2 + .../01655_plan_optimizations.reference | 23 ++++ .../0_stateless/01655_plan_optimizations.sh | 46 ++++++++ 11 files changed, 185 insertions(+), 4 deletions(-) diff --git a/contrib/libpq b/contrib/libpq index c7624588ddd..1f9c286dba6 160000 --- a/contrib/libpq +++ b/contrib/libpq @@ -1 +1 @@ -Subproject commit c7624588ddd84f153dd5990e81b886e4568bddde +Subproject commit 1f9c286dba60809edb64e384d6727d80d269b6cf diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index b726de44f3a..7f70c397db9 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -134,6 +134,8 @@ class HashJoin : public IJoin public: HashJoin(std::shared_ptr table_join_, const Block & right_sample_block, bool any_take_last_row_ = false); + const TableJoin & getTableJoin() const override { return *table_join; } + /** Add block of data from right hand of JOIN to the map. * Returns false, if some limit was exceeded and you should not insert more data. */ diff --git a/src/Interpreters/IJoin.h b/src/Interpreters/IJoin.h index ade6eaa0cc9..b326f06fa7e 100644 --- a/src/Interpreters/IJoin.h +++ b/src/Interpreters/IJoin.h @@ -14,11 +14,15 @@ class Block; struct ExtraBlock; using ExtraBlockPtr = std::shared_ptr; +class TableJoin; + class IJoin { public: virtual ~IJoin() = default; + virtual const TableJoin & getTableJoin() const = 0; + /// Add block of data from right hand of JOIN. /// @returns false, if some limit was exceeded and you should not insert more data. virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0; diff --git a/src/Interpreters/JoinSwitcher.h b/src/Interpreters/JoinSwitcher.h index 1fd719cd5dc..75ff7bb9b2c 100644 --- a/src/Interpreters/JoinSwitcher.h +++ b/src/Interpreters/JoinSwitcher.h @@ -19,6 +19,8 @@ class JoinSwitcher : public IJoin public: JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_); + const TableJoin & getTableJoin() const override { return *table_join; } + /// Add block of data from right hand of JOIN into current join object. /// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it. /// @returns false, if join-on-disk disk limit exceeded diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index a13d0304907..caae648e652 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -23,6 +23,7 @@ class MergeJoin : public IJoin public: MergeJoin(std::shared_ptr table_join_, const Block & right_sample_block); + const TableJoin & getTableJoin() const override { return *table_join; } bool addJoinedBlock(const Block & block, bool check_limits) override; void joinBlock(Block &, ExtraBlockPtr & not_processed) override; void joinTotals(Block &) const override; diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 7c8db2f2687..bcc1b0ef7b6 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -45,6 +45,8 @@ public: void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override; + const JoinPtr & getJoin() const { return join; } + private: JoinPtr join; bool has_non_joined_rows; diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp index 552720fa1a4..2eae6d23b5c 100644 --- a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -11,8 +12,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -82,7 +85,7 @@ static size_t tryAddNewFilterStep( return 3; } -static Names getAggregatinKeys(const Aggregator::Params & params) +static Names getAggregatingKeys(const Aggregator::Params & params) { Names keys; keys.reserve(params.keys.size()); @@ -112,17 +115,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes if (auto * aggregating = typeid_cast(child.get())) { const auto & params = aggregating->getParams(); - Names keys = getAggregatinKeys(params); + Names keys = getAggregatingKeys(params); if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys)) return updated_steps; } + if (typeid_cast(child.get())) + { + /// CreatingSets does not change header. + /// We can push down filter and update header. + /// - Something + /// Filter - CreatingSets - CreatingSet + /// - CreatingSet + auto input_streams = child->getInputStreams(); + input_streams.front() = filter->getOutputStream(); + child = std::make_unique(input_streams); + std::swap(parent, child); + std::swap(parent_node->children, child_node->children); + std::swap(parent_node->children.front(), child_node->children.front()); + /// - Filter - Something + /// CreatingSets - CreatingSet + /// - CreatingSet + return 2; + } + if (auto * totals_having = typeid_cast(child.get())) { /// If totals step has HAVING expression, skip it for now. /// TODO: - /// We can merge HAING expression with current filer. + /// We can merge HAVING expression with current filer. /// Also, we can push down part of HAVING which depend only on aggregation keys. if (totals_having->getActions()) return 0; @@ -168,6 +190,36 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes return updated_steps; } + if (auto * join = typeid_cast(child.get())) + { + const auto & table_join = join->getJoin()->getTableJoin(); + /// Push down is for left table only. We need to update JoinStep for push down into right. + /// Only inner and left join are supported. Other types may generate default values for left table keys. + /// So, if we push down a condition like `key != 0`, not all rows may be filtered. + if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left) + { + const auto & left_header = join->getInputStreams().front().header; + const auto & res_header = join->getOutputStream().header; + Names allowed_keys; + for (const auto & name : table_join.keyNamesLeft()) + { + /// Skip key if it is renamed. + /// I don't know if it is possible. Just in case. + if (!left_header.has(name) || !res_header.has(name)) + continue; + + /// Skip if type is changed. Push down expression expect equal types. + if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type)) + continue; + + allowed_keys.push_back(name); + } + + if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys)) + return updated_steps; + } + } + /// TODO. /// We can filter earlier if expression does not depend on WITH FILL columns. /// But we cannot just push down condition, because other column may be filled with defaults. @@ -193,6 +245,48 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes return updated_steps; } + if (auto * union_step = typeid_cast(child.get())) + { + /// Union does not change header. + /// We can push down filter and update header. + auto union_input_streams = child->getInputStreams(); + for (auto & input_stream : union_input_streams) + input_stream.header = filter->getOutputStream().header; + + /// - Something + /// Filter - Union - Something + /// - Something + + child = std::make_unique(union_input_streams, union_step->getMaxThreads()); + + std::swap(parent, child); + std::swap(parent_node->children, child_node->children); + std::swap(parent_node->children.front(), child_node->children.front()); + + /// - Filter - Something + /// Union - Something + /// - Something + + for (size_t i = 1; i < parent_node->children.size(); ++i) + { + auto & filter_node = nodes.emplace_back(); + filter_node.children.push_back(parent_node->children[i]); + parent_node->children[i] = &filter_node; + + filter_node.step = std::make_unique( + filter_node.children.front()->step->getOutputStream(), + filter->getExpression()->clone(), + filter->getFilterColumnName(), + filter->removesFilterColumn()); + } + + /// - Filter - Something + /// Union - Filter - Something + /// - Filter - Something + + return 3; + } + return 0; } diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index a697517b5e8..7403dd0a12a 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -6,6 +6,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static Block checkHeaders(const DataStreams & input_streams) { if (input_streams.empty()) diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index 738ada4a565..81bd033d045 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -17,6 +17,8 @@ public: void describePipeline(FormatSettings & settings) const override; + size_t getMaxThreads() const { return max_threads; } + private: Block header; size_t max_threads; diff --git a/tests/queries/0_stateless/01655_plan_optimizations.reference b/tests/queries/0_stateless/01655_plan_optimizations.reference index 99b32b74ca7..22f5a2e73e3 100644 --- a/tests/queries/0_stateless/01655_plan_optimizations.reference +++ b/tests/queries/0_stateless/01655_plan_optimizations.reference @@ -123,3 +123,26 @@ Filter column: notEquals(y, 2) 3 10 0 37 +> filter is pushed down before CreatingSets +CreatingSets +Filter +Filter +1 +3 +> one condition of filter is pushed down before LEFT JOIN +Join +Filter column: notEquals(number, 1) +Join +0 0 +3 3 +> one condition of filter is pushed down before INNER JOIN +Join +Filter column: notEquals(number, 1) +Join +3 3 +> filter is pushed down before UNION +Union +Filter +Filter +2 3 +2 3 diff --git a/tests/queries/0_stateless/01655_plan_optimizations.sh b/tests/queries/0_stateless/01655_plan_optimizations.sh index 3148dc4a597..148e6157773 100755 --- a/tests/queries/0_stateless/01655_plan_optimizations.sh +++ b/tests/queries/0_stateless/01655_plan_optimizations.sh @@ -150,3 +150,49 @@ $CLICKHOUSE_CLIENT -q " select * from ( select y, sum(x) from (select number as x, number % 4 as y from numbers(10)) group by y with totals ) where y != 2" + +echo "> filter is pushed down before CreatingSets" +$CLICKHOUSE_CLIENT -q " + explain select number from ( + select number from numbers(5) where number in (select 1 + number from numbers(3)) + ) where number != 2 settings enable_optimize_predicate_expression=0" | + grep -o "CreatingSets\|Filter" +$CLICKHOUSE_CLIENT -q " + select number from ( + select number from numbers(5) where number in (select 1 + number from numbers(3)) + ) where number != 2 settings enable_optimize_predicate_expression=0" + +echo "> one condition of filter is pushed down before LEFT JOIN" +$CLICKHOUSE_CLIENT -q " + explain actions = 1 + select number as a, r.b from numbers(4) as l any left join ( + select number + 2 as b from numbers(3) + ) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" | + grep -o "Join\|Filter column: notEquals(number, 1)" +$CLICKHOUSE_CLIENT -q " + select number as a, r.b from numbers(4) as l any left join ( + select number + 2 as b from numbers(3) + ) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" + +echo "> one condition of filter is pushed down before INNER JOIN" +$CLICKHOUSE_CLIENT -q " + explain actions = 1 + select number as a, r.b from numbers(4) as l any inner join ( + select number + 2 as b from numbers(3) + ) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" | + grep -o "Join\|Filter column: notEquals(number, 1)" +$CLICKHOUSE_CLIENT -q " + select number as a, r.b from numbers(4) as l any inner join ( + select number + 2 as b from numbers(3) + ) as r on a = r.b where a != 1 and b != 2 settings enable_optimize_predicate_expression = 0" + +echo "> filter is pushed down before UNION" +$CLICKHOUSE_CLIENT -q " + explain select a, b from ( + select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2) + ) where a != 1 settings enable_optimize_predicate_expression = 0" | + grep -o "Union\|Filter" +$CLICKHOUSE_CLIENT -q " + select a, b from ( + select number + 1 as a, number + 2 as b from numbers(2) union all select number + 1 as b, number + 2 as a from numbers(2) + ) where a != 1 settings enable_optimize_predicate_expression = 0"