From 90c7cf5a5293a32654e97cc8b4f8cb1d2090d3be Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 12 Feb 2021 18:24:31 +0300 Subject: [PATCH] Push down for ArrayJoin --- .../Optimizations/filterPushDown.cpp | 116 ++++++++++++------ 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp index ec005e59729..98e923249f3 100644 --- a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp @@ -3,7 +3,9 @@ #include #include #include +#include #include +#include #include #include @@ -15,6 +17,68 @@ namespace DB::ErrorCodes namespace DB::QueryPlanOptimizations { +static size_t tryAddNewFilterStep( + QueryPlan::Node * parent_node, + QueryPlan::Nodes & nodes, + const Names & allowed_inputs) +{ + QueryPlan::Node * child_node = parent_node->children.front(); + + auto & parent = parent_node->step; + auto & child = child_node->step; + + auto * filter = static_cast(parent.get()); + const auto & expression = filter->getExpression(); + const auto & filter_column_name = filter->getFilterColumnName(); + bool removes_filter = filter->removesFilterColumn(); + + // std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl; + + auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, allowed_inputs); + if (!split_filter) + return 0; + + // std::cerr << "===============\n" << expression->dumpDAG() << std::endl; + // std::cerr << "---------------\n" << split_filter->dumpDAG() << std::endl; + + const auto & index = expression->getIndex(); + auto it = index.begin(); + for (; it != index.end(); ++it) + if ((*it)->result_name == filter_column_name) + break; + + if (it == expression->getIndex().end()) + { + if (!removes_filter) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}", + filter_column_name, expression->dumpDAG()); + + std::cerr << "replacing to expr because filter " << filter_column_name << " was removed\n"; + parent = std::make_unique(child->getOutputStream(), expression); + } + else if ((*it)->column && isColumnConst(*(*it)->column)) + { + std::cerr << "replacing to expr because filter is const\n"; + parent = std::make_unique(child->getOutputStream(), expression); + } + + /// Add new Filter step before Aggregating. + /// Expression/Filter -> Aggregating -> Something + auto & node = nodes.emplace_back(); + node.children.swap(child_node->children); + child_node->children.emplace_back(&node); + /// Expression/Filter -> Aggregating -> Filter -> Something + + /// New filter column is added to the end. + auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name; + node.step = std::make_unique( + node.children.at(0)->step->getOutputStream(), + std::move(split_filter), std::move(split_filter_column_name), true); + + return 3; +} + size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes) { if (parent_node->children.size() != 1) @@ -29,10 +93,6 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes if (!filter) return 0; - const auto & expression = filter->getExpression(); - const auto & filter_column_name = filter->getFilterColumnName(); - bool removes_filter = filter->removesFilterColumn(); - if (auto * aggregating = typeid_cast(child.get())) { const auto & params = aggregating->getParams(); @@ -42,42 +102,26 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes for (auto pos : params.keys) keys.push_back(params.src_header.getByPosition(pos).name); - // std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl; - if (auto split_filter = expression->splitActionsForFilter(filter_column_name, removes_filter, keys)) - { - // std::cerr << "===============\n" << expression->dumpDAG() << std::endl; - // std::cerr << "---------------\n" << split_filter->dumpDAG() << std::endl; + if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, keys)) + return updated_steps; + } - auto it = expression->getIndex().find(filter_column_name); - if (it == expression->getIndex().end()) - { - if (!removes_filter) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}", - filter_column_name, expression->dumpDAG()); + if (auto * array_join = typeid_cast(child.get())) + { + const auto & array_join_actions = array_join->arrayJoin(); + const auto & keys = array_join_actions->columns; + const auto & array_join_header = array_join->getInputStreams().front().header; - parent = std::make_unique(child->getOutputStream(), expression); - } - else if ((*it)->column && isColumnConst(*(*it)->column)) - { - parent = std::make_unique(child->getOutputStream(), expression); - } + Names allowed_inputs; + for (const auto & column : array_join_header) + if (keys.count(column.name) == 0) + allowed_inputs.push_back(column.name); - /// Add new Filter step before Aggregating. - /// Expression/Filter -> Aggregating -> Something - auto & node = nodes.emplace_back(); - node.children.swap(child_node->children); - child_node->children.emplace_back(&node); - /// Expression/Filter -> Aggregating -> Filter -> Something + for (const auto & name : allowed_inputs) + std::cerr << name << std::endl; - /// New filter column is added to the end. - auto split_filter_column_name = (*split_filter->getIndex().rbegin())->result_name; - node.step = std::make_unique( - node.children.at(0)->step->getOutputStream(), - std::move(split_filter), std::move(split_filter_column_name), true); - - return 3; - } + if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs)) + return updated_steps; } return 0;