diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp index ea81208df8e..ce7818a3298 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp @@ -257,7 +257,7 @@ bool areAggregatesMatch( ActionsDAGPtr analyzeAggregateProjection( const AggregateProjectionInfo & info, - ActionsDAG & query_dag, + const ActionsDAG & query_dag, const ActionsDAG::Node * filter_node, const Names & keys, const AggregateDescriptions & aggregates) @@ -411,7 +411,7 @@ static void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expressi /// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain. /// Additionally, for all the Filter steps, we collect filter conditions into filter_nodes. /// Flag need_remove_column is set in case if the last step is a Filter step and it should remove filter column. -static bool buildAggregatingDAG( +static bool buildQueryDAG( QueryPlan::Node & node, ActionsDAGPtr & dag, ActionsDAG::NodeRawConstPtrs & filter_nodes, @@ -448,7 +448,7 @@ static bool buildAggregatingDAG( if (node.children.size() != 1) return false; - if (!buildAggregatingDAG(*node.children.front(), dag, filter_nodes, need_remove_column)) + if (!buildQueryDAG(*node.children.front(), dag, filter_nodes, need_remove_column)) return false; if (auto * expression = typeid_cast(step)) @@ -503,6 +503,131 @@ bool canUseProjectionForReadingStep(ReadFromMergeTree * reading) return true; } +struct MinMaxProjectionCandidate +{ + AggregateProjectionCandidate candidate; + Block minmax_count_projection_block; + MergeTreeData::DataPartsVector minmax_projection_normal_parts; +}; + +struct AggregateProjectionCandidates +{ + std::vector real; + std::optional minmax_projection; + bool has_filter = false; +}; + +AggregateProjectionCandidates getAggregateProjectionCandidates( + QueryPlan::Node & node, + AggregatingStep & aggregating, + ReadFromMergeTree & reading, + const std::shared_ptr & max_added_blocks) +{ + const auto & keys = aggregating.getParams().keys; + const auto & aggregates = aggregating.getParams().aggregates; + Block key_virtual_columns = reading.getMergeTreeData().getSampleBlockWithVirtualColumns(); + + AggregateProjectionCandidates candidates; + + const auto & parts = reading.getParts(); + const auto & query_info = reading.getQueryInfo(); + + const auto metadata = reading.getStorageMetadata(); + ContextPtr context = reading.getContext(); + + const auto & projections = metadata->projections; + std::vector agg_projections; + for (const auto & projection : projections) + if (projection.type == ProjectionDescription::Type::Aggregate) + agg_projections.push_back(&projection); + + bool can_use_minmax_projection = metadata->minmax_count_projection && !reading.getMergeTreeData().has_lightweight_delete_parts.load(); + + if (!can_use_minmax_projection && agg_projections.empty()) + return candidates; + + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Has agg projection"); + + ActionsDAGPtr dag; + bool need_remove_column = false; // not used here + ActionsDAG::NodeRawConstPtrs filter_nodes; + if (!buildQueryDAG(*node.children.front(), dag, filter_nodes, need_remove_column)) + return candidates; + + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG()); + + const ActionsDAG::Node * filter_node = nullptr; + if (!filter_nodes.empty()) + { + filter_node = filter_nodes.front(); + if (filter_nodes.size() > 1) + { + FunctionOverloadResolverPtr func_builder_and = + std::make_unique( + std::make_shared()); + + filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); + } + + dag->getOutputs().push_back(filter_node); + } + candidates.has_filter = filter_node; + + if (can_use_minmax_projection) + { + const auto * projection = &*(metadata->minmax_count_projection); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name); + auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG()); + if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates)) + { + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG()); + AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)}; + MergeTreeData::DataPartsVector minmax_projection_normal_parts; + + auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock( + metadata, + candidate.dag->getRequiredColumnsNames(), + filter_node != nullptr, + query_info, + parts, + minmax_projection_normal_parts, + max_added_blocks.get(), + context); + + if (block) + { + MinMaxProjectionCandidate minmax; + minmax.candidate = std::move(candidate); + minmax.minmax_count_projection_block = std::move(block); + minmax.minmax_projection_normal_parts = std::move(minmax_projection_normal_parts); + minmax.candidate.projection = projection; + candidates.minmax_projection.emplace(std::move(minmax)); + } + } + } + + if (!candidates.minmax_projection) + { + candidates.real.reserve(agg_projections.size()); + for (const auto * projection : agg_projections) + { + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name); + auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns); + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG()); + if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates)) + { + // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG()); + AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)}; + candidate.projection = projection; + candidates.real.emplace_back(std::move(candidate)); + } + } + } + + return candidates; +} + bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) { if (node.children.size() != 1) @@ -527,123 +652,28 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) return false; const auto metadata = reading->getStorageMetadata(); - const auto & projections = metadata->projections; - - bool can_use_minmax_projection = metadata->minmax_count_projection && !reading->getMergeTreeData().has_lightweight_delete_parts.load(); - - std::vector agg_projections; - for (const auto & projection : projections) - if (projection.type == ProjectionDescription::Type::Aggregate) - agg_projections.push_back(&projection); - - if (!can_use_minmax_projection && agg_projections.empty()) - return false; - - // LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Has agg projection"); - - ActionsDAGPtr dag; - bool need_remove_column = false; - ActionsDAG::NodeRawConstPtrs filter_nodes; - if (!buildAggregatingDAG(*node.children.front(), dag, filter_nodes, need_remove_column)) - return false; - - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG()); - - const ActionsDAG::Node * filter_node = nullptr; - if (!filter_nodes.empty()) - { - filter_node = filter_nodes.front(); - if (filter_nodes.size() > 1) - { - FunctionOverloadResolverPtr func_builder_and = - std::make_unique( - std::make_shared()); - - filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); - } - - dag->getOutputs().push_back(filter_node); - } - ContextPtr context = reading->getContext(); - const auto & keys = aggregating->getParams().keys; - const auto & aggregates = aggregating->getParams().aggregates; - Block key_virtual_columns = reading->getMergeTreeData().getSampleBlockWithVirtualColumns(); - - std::vector candidates; - std::optional minmax_projection; - Block minmax_count_projection_block; - MergeTreeData::DataPartsVector minmax_projection_normal_parts; - - const auto & parts = reading->getParts(); - const auto & query_info = reading->getQueryInfo(); - auto query_info_copy = query_info; - query_info_copy.prewhere_info = nullptr; std::shared_ptr max_added_blocks; - - AggregateProjectionCandidate * best_candidate = nullptr; - - if (can_use_minmax_projection) - { - const auto * projection = &*(metadata->minmax_count_projection); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name); - auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG()); - if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates)) - { - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG()); - minmax_projection.emplace(AggregateProjectionCandidate{.info = std::move(info), .dag = std::move(proj_dag)}); - minmax_projection->projection = projection; - - minmax_count_projection_block = reading->getMergeTreeData().getMinMaxCountProjectionBlock( - metadata, - minmax_projection->dag->getRequiredColumnsNames(), - filter_node != nullptr, - query_info, - parts, - minmax_projection_normal_parts, - max_added_blocks.get(), - context); - - if (!minmax_count_projection_block) - minmax_projection.reset(); - else - best_candidate = &*minmax_projection; - } - } - - if (!minmax_projection) - { - candidates.reserve(agg_projections.size()); - for (const auto * projection : agg_projections) - { - - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name); - auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns); - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG()); - if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates)) - { - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG()); - AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)}; - candidate.projection = projection; - candidates.emplace_back(std::move(candidate)); - } - } - - if (candidates.empty()) - return false; - } - - MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); - if (context->getSettingsRef().select_sequential_consistency) { if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&reading->getMergeTreeData())) max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); } - for (auto & candidate : candidates) + auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks); + + AggregateProjectionCandidate * best_candidate = nullptr; + if (candidates.minmax_projection) + best_candidate = &candidates.minmax_projection->candidate; + else if (candidates.real.empty()) + return false; + + MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); + const auto & parts = reading->getParts(); + const auto & query_info = reading->getQueryInfo(); + + for (auto & candidate : candidates.real) { MergeTreeData::DataPartsVector projection_parts; MergeTreeData::DataPartsVector normal_parts; @@ -661,7 +691,7 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) continue; ActionDAGNodes added_filter_nodes; - if (filter_node) + if (candidates.has_filter) added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front()); auto projection_result_ptr = reader.estimateNumMarksToRead( @@ -700,22 +730,22 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) best_candidate = &candidate; } - if (!best_candidate && !minmax_projection) + if (!best_candidate) return false; QueryPlanStepPtr projection_reading; bool has_nornal_parts; - if (minmax_projection) + if (candidates.minmax_projection) { - LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", minmax_count_projection_block.dumpStructure()); + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", candidates.minmax_projection->minmax_count_projection_block.dumpStructure()); - Pipe pipe(std::make_shared(std::move(minmax_count_projection_block))); + Pipe pipe(std::make_shared(std::move(candidates.minmax_projection->minmax_count_projection_block))); projection_reading = std::make_unique(std::move(pipe)); - has_nornal_parts = !minmax_projection_normal_parts.empty(); + has_nornal_parts = !candidates.minmax_projection->minmax_projection_normal_parts.empty(); if (has_nornal_parts) - reading->resetParts(std::move(minmax_projection_normal_parts)); + reading->resetParts(std::move(candidates.minmax_projection->minmax_projection_normal_parts)); } else { @@ -726,6 +756,9 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); + auto query_info_copy = query_info; + query_info_copy.prewhere_info = nullptr; + projection_reading = reader.readFromParts( {}, best_candidate->dag->getRequiredColumnsNames(), @@ -756,7 +789,7 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)}); auto & expr_or_filter_node = nodes.emplace_back(); - if (filter_node) + if (candidates.has_filter) { expr_or_filter_node.step = std::make_unique( projection_reading_node.step->getOutputStream(), @@ -833,7 +866,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) ActionsDAGPtr dag; ActionsDAG::NodeRawConstPtrs filter_nodes; bool need_remove_column = false; - if (!buildAggregatingDAG(*iter->node->children.front(), dag, filter_nodes, need_remove_column)) + if (!buildQueryDAG(*iter->node->children.front(), dag, filter_nodes, need_remove_column)) return false; const ActionsDAG::Node * filter_node = nullptr;