From 3a020d2dd5c4ffda10fb4dd79509f5e04f45e692 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 11 Feb 2021 11:49:12 +0300 Subject: [PATCH] filter push down for Aggregating --- src/Processors/QueryPlan/AggregatingStep.h | 2 + .../QueryPlan/Optimizations/Optimizations.h | 7 +- .../Optimizations/filterPushDown.cpp | 77 +++++++++++++++++++ 3 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 src/Processors/QueryPlan/Optimizations/filterPushDown.cpp diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index 853173895b3..6be92394fab 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -32,6 +32,8 @@ public: void describeActions(FormatSettings &) const override; void describePipeline(FormatSettings & settings) const override; + const Aggregator::Params & getParams() const { return params; } + private: Aggregator::Params params; bool final; diff --git a/src/Processors/QueryPlan/Optimizations/Optimizations.h b/src/Processors/QueryPlan/Optimizations/Optimizations.h index 454eab9649a..be7f81e5db0 100644 --- a/src/Processors/QueryPlan/Optimizations/Optimizations.h +++ b/src/Processors/QueryPlan/Optimizations/Optimizations.h @@ -38,14 +38,19 @@ size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes); /// Replace chain `FilterStep -> ExpressionStep` to single FilterStep size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &); +/// Move FilterStep down if possible. +/// May split FilterStep and push down only part of it. +size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes); + inline const auto & getOptimizations() { - static const std::array optimizations = + static const std::array optimizations = {{ {tryLiftUpArrayJoin, "liftUpArrayJoin"}, {tryPushDownLimit, "pushDownLimit"}, {trySplitFilter, "splitFilter"}, {tryMergeExpressions, "mergeExpressions"}, + {tryPushDownLimit, "pushDownFilter"}, }}; return optimizations; diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp new file mode 100644 index 00000000000..82704bcbce9 --- /dev/null +++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp @@ -0,0 +1,77 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +namespace DB::QueryPlanOptimizations +{ + +size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes) +{ + if (parent_node->children.size() != 1) + return 0; + + QueryPlan::Node * child_node = parent_node->children.front(); + + auto & parent = parent_node->step; + auto & child = child_node->step; + auto * filter = typeid_cast(parent.get()); + + if (!filter) + return 0; + + const auto & expression = filter->getExpression(); + const auto & filter_column_name = filter->getFilterColumnName(); + bool removes_filter = filter->removesFilterColumn(); + + if (auto * aggregating = typeid_cast(child.get())) + { + const auto & params = aggregating->getParams(); + + Names keys; + keys.reserve(params.keys.size()); + for (auto pos : params.keys) + keys.push_back(params.src_header.getByPosition(pos).name); + + if (auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, keys)) + { + auto it = expression->getIndex().find(filter_column_name); + if (it == expression->getIndex().end()) + { + if (!removes_filter) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}", + filter_column_name, expression->dumpDAG()); + + parent = std::make_unique(child->getOutputStream(), expression); + } + + /// Add new Filter step before Aggregating. + /// Expression/Filter -> Aggregating -> Something + auto & node = nodes.emplace_back(); + node.children.swap(child_node->children); + child_node->children.emplace_back(&node); + /// Expression/Filter -> Aggregating -> Filter -> Something + + /// New filter column is added to the end. + auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name; + node.step = std::make_unique( + node.children.at(0)->step->getOutputStream(), + std::move(split_filter), std::move(split_filter_column_name), true); + + return 3; + } + } + + return 0; +} + +}