diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp index a444538c8ce..2694285c2c9 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp @@ -22,21 +22,6 @@ namespace DB::QueryPlanOptimizations { -static QueryPlan::Node * findReadingStep(QueryPlan::Node & node) -{ - IQueryPlanStep * step = node.step.get(); - if (auto * reading = typeid_cast(step)) - return &node; - - if (node.children.size() != 1) - return nullptr; - - if (typeid_cast(step) || typeid_cast(step)) - return findReadingStep(*node.children.front()); - - return nullptr; -} - /// This is a common DAG which is a merge of DAGs from Filter and Expression steps chain. /// Additionally, for all the Filter steps, we collect filter conditions into filter_nodes. /// Flag remove_last_filter_node is set in case if the last step is a Filter step and it should remove filter column. @@ -124,28 +109,109 @@ bool QueryDAG::build(QueryPlan::Node & node) return false; } -static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading) +struct AggregateQueryDAG { - /// Probably some projection already was applied. - if (reading->hasAnalyzedResult()) - return false; + ActionsDAGPtr dag; + const ActionsDAG::Node * filter_node = nullptr; - if (reading->isQueryWithFinal()) - return false; + bool build(QueryPlan::Node & node) + { + QueryDAG query; + if (!query.build(node)) + return false; - if (reading->isQueryWithSampling()) - return false; + dag = std::move(query.dag); + auto filter_nodes = std::move(query.filter_nodes); - if (reading->isParallelReadingEnabled()) - return false; + if (!filter_nodes.empty()) + { + filter_node = filter_nodes.front(); + if (filter_nodes.size() > 1) + { + FunctionOverloadResolverPtr func_builder_and = + std::make_unique( + std::make_shared()); - // Currently projection don't support deduplication when moving parts between shards. - if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication) - return false; + filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); + } - return true; -} + dag->getOutputs().push_back(filter_node); + } + return true; + } +}; + +struct NormalQueryDAG +{ + ActionsDAGPtr dag; + bool need_remove_column = false; + const ActionsDAG::Node * filter_node = nullptr; + + bool build(QueryPlan::Node & node) + { + QueryDAG query; + if (!query.build(node)) + return false; + + dag = std::move(query.dag); + auto filter_nodes = std::move(query.filter_nodes); + need_remove_column = query.remove_last_filter_node; + + if (!filter_nodes.empty()) + { + auto & outputs = dag->getOutputs(); + filter_node = filter_nodes.back(); + + if (filter_nodes.size() > 1) + { + /// Add a conjunction of all the filters. + if (need_remove_column) + { + /// Last filter column is not needed; remove it right here + size_t pos = 0; + while (pos < outputs.size() && outputs[pos] != filter_node) + ++pos; + + if (pos < outputs.size()) + outputs.erase(outputs.begin() + pos); + } + else + { + /// Last filter is needed; we must replace it to constant 1, + /// As well as FilterStep does to make a compatible header. + for (auto & output : outputs) + { + if (output == filter_node) + { + ColumnWithTypeAndName col; + col.name = filter_node->result_name; + col.type = filter_node->result_type; + col.column = col.type->createColumnConst(1, 1); + output = &dag->addColumn(std::move(col)); + } + } + } + + FunctionOverloadResolverPtr func_builder_and = + std::make_unique( + std::make_shared()); + + filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); + outputs.insert(outputs.begin(), filter_node); + need_remove_column = true; + } + } + + if (dag) + { + dag->removeUnusedActions(); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Header {}, Query DAG: {}", header.dumpStructure(), dag->dumpDAG()); + } + + return true; + } +}; /// Required analysis info from aggregate projection. struct AggregateProjectionInfo @@ -380,110 +446,6 @@ bool areAggregatesMatch( return true; } -struct AggregateQueryDAG -{ - ActionsDAGPtr dag; - const ActionsDAG::Node * filter_node = nullptr; - - bool build(QueryPlan::Node & node) - { - QueryDAG query; - if (!query.build(node)) - return false; - - dag = std::move(query.dag); - auto filter_nodes = std::move(query.filter_nodes); - - if (!filter_nodes.empty()) - { - filter_node = filter_nodes.front(); - if (filter_nodes.size() > 1) - { - FunctionOverloadResolverPtr func_builder_and = - std::make_unique( - std::make_shared()); - - filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); - } - - dag->getOutputs().push_back(filter_node); - } - - return true; - } -}; - -struct NormalQueryDAG -{ - ActionsDAGPtr dag; - bool need_remove_column = false; - const ActionsDAG::Node * filter_node = nullptr; - - bool build(QueryPlan::Node & node) - { - QueryDAG query; - if (!query.build(node)) - return false; - - dag = std::move(query.dag); - auto filter_nodes = std::move(query.filter_nodes); - need_remove_column = query.remove_last_filter_node; - - if (!filter_nodes.empty()) - { - auto & outputs = dag->getOutputs(); - filter_node = filter_nodes.back(); - - if (filter_nodes.size() > 1) - { - /// Add a conjunction of all the filters. - if (need_remove_column) - { - /// Last filter column is not needed; remove it right here - size_t pos = 0; - while (pos < outputs.size() && outputs[pos] != filter_node) - ++pos; - - if (pos < outputs.size()) - outputs.erase(outputs.begin() + pos); - } - else - { - /// Last filter is needed; we must replace it to constant 1, - /// As well as FilterStep does to make a compatible header. - for (auto & output : outputs) - { - if (output == filter_node) - { - ColumnWithTypeAndName col; - col.name = filter_node->result_name; - col.type = filter_node->result_type; - col.column = col.type->createColumnConst(1, 1); - output = &dag->addColumn(std::move(col)); - } - } - } - - FunctionOverloadResolverPtr func_builder_and = - std::make_unique( - std::make_shared()); - - filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); - outputs.insert(outputs.begin(), filter_node); - need_remove_column = true; - } - } - - if (dag) - { - dag->removeUnusedActions(); - // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Header {}, Query DAG: {}", header.dumpStructure(), dag->dumpDAG()); - } - - return true; - } -}; - ActionsDAGPtr analyzeAggregateProjection( const AggregateProjectionInfo & info, const AggregateQueryDAG & query, @@ -797,6 +759,43 @@ static bool analyzeProjectionCandidate( return true; } +static QueryPlan::Node * findReadingStep(QueryPlan::Node & node) +{ + IQueryPlanStep * step = node.step.get(); + if (auto * reading = typeid_cast(step)) + return &node; + + if (node.children.size() != 1) + return nullptr; + + if (typeid_cast(step) || typeid_cast(step)) + return findReadingStep(*node.children.front()); + + return nullptr; +} + +static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading) +{ + /// Probably some projection already was applied. + if (reading->hasAnalyzedResult()) + return false; + + if (reading->isQueryWithFinal()) + return false; + + if (reading->isQueryWithSampling()) + return false; + + if (reading->isParallelReadingEnabled()) + return false; + + // Currently projection don't support deduplication when moving parts between shards. + if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication) + return false; + + return true; +} + bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) { if (node.children.size() != 1)