filter push down for Aggregating

This commit is contained in:
Nikolai Kochetov 2021-02-11 11:49:12 +03:00
parent a83885392e
commit 3a020d2dd5
3 changed files with 85 additions and 1 deletions

View File

@ -32,6 +32,8 @@ public:
void describeActions(FormatSettings &) const override;
void describePipeline(FormatSettings & settings) const override;
const Aggregator::Params & getParams() const { return params; }
private:
Aggregator::Params params;
bool final;

View File

@ -38,14 +38,19 @@ size_t trySplitFilter(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
/// Replace chain `FilterStep -> ExpressionStep` to single FilterStep
size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &);
/// Move FilterStep down if possible.
/// May split FilterStep and push down only part of it.
size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 4> optimizations =
static const std::array<Optimization, 5> optimizations =
{{
{tryLiftUpArrayJoin, "liftUpArrayJoin"},
{tryPushDownLimit, "pushDownLimit"},
{trySplitFilter, "splitFilter"},
{tryMergeExpressions, "mergeExpressions"},
{tryPushDownLimit, "pushDownFilter"},
}};
return optimizations;

View File

@ -0,0 +1,77 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Common/typeid_cast.h>
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB::QueryPlanOptimizations
{
size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
return 0;
QueryPlan::Node * child_node = parent_node->children.front();
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * filter = typeid_cast<FilterStep *>(parent.get());
if (!filter)
return 0;
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
bool removes_filter = filter->removesFilterColumn();
if (auto * aggregating = typeid_cast<AggregatingStep *>(child.get()))
{
const auto & params = aggregating->getParams();
Names keys;
keys.reserve(params.keys.size());
for (auto pos : params.keys)
keys.push_back(params.src_header.getByPosition(pos).name);
if (auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, keys))
{
auto it = expression->getIndex().find(filter_column_name);
if (it == expression->getIndex().end())
{
if (!removes_filter)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
filter_column_name, expression->dumpDAG());
parent = std::make_unique<ExpressionStep>(child->getOutputStream(), expression);
}
/// Add new Filter step before Aggregating.
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is added to the end.
auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(),
std::move(split_filter), std::move(split_filter_column_name), true);
return 3;
}
}
return 0;
}
}