From a542626fa35a7f5a37eb98358738554188f74a3c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 6 Feb 2023 18:42:58 +0000 Subject: [PATCH] Make 01710_aggregate_projections work. --- src/Interpreters/ActionsDAG.cpp | 19 +++++--- src/Interpreters/ActionsDAG.h | 2 +- src/Processors/QueryPlan/AggregatingStep.cpp | 3 +- .../Optimizations/optimizeUseProjections.cpp | 47 ++++++++++++------- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index d2a1d7becfe..0df7b71f719 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -724,7 +724,7 @@ NameSet ActionsDAG::foldActionsByProjection( } -ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map & new_inputs) +ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map & new_inputs, const NodeRawConstPtrs & required_outputs) { auto dag = std::make_unique(); std::unordered_map new_input_to_pos; @@ -737,7 +737,7 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map stack; - for (const auto * output : outputs) + for (const auto * output : required_outputs) { if (mapping.contains(output)) continue; @@ -754,11 +754,15 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_mapaddInput(new_input->result_name, new_input->result_type); - if (!rename.empty() && new_input->result_name != rename) - node = &dag->addAlias(*node, rename); + auto & node = mapping[frame.node]; + + if (!node) + { + node = &dag->addInput(new_input->result_name, new_input->result_type); + if (!rename.empty() && new_input->result_name != rename) + node = &dag->addAlias(*node, rename); + } - mapping.emplace(frame.node, node); stack.pop_back(); continue; } @@ -786,11 +790,12 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_mapoutputs.push_back(mapping[output]); return dag; diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index b23c87b4903..93280c66668 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -214,7 +214,7 @@ public: const String & predicate_column_name = {}, bool add_missing_keys = true); - ActionsDAGPtr foldActionsByProjection(const std::unordered_map & new_inputs); + static ActionsDAGPtr foldActionsByProjection(const std::unordered_map & new_inputs, const NodeRawConstPtrs & required_outputs); /// Reorder the output nodes using given position mapping. void reorderAggregationKeysForProjection(const std::unordered_map & key_names_pos_map); diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index db85764ef33..6aacc2f8fae 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -467,7 +467,8 @@ void AggregatingStep::describePipeline(FormatSettings & settings) const bool AggregatingStep::canUseProjection() const { - return grouping_sets_params.empty() && sort_description_for_merging.empty() && !memory_bound_merging_of_aggregation_results_enabled; + //LOG_TRACE(&Poco::Logger::get("AggregatingStep"), "canUseProjection {} {} {}", grouping_sets_params.size(), sort_description_for_merging.size(), memory_bound_merging_of_aggregation_results_enabled); + return grouping_sets_params.empty() && sort_description_for_merging.empty(); // && !memory_bound_merging_of_aggregation_results_enabled; } void AggregatingStep::requestOnlyMergeForAggregateProjection(const DataStream & input_stream) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp index e2a942ed2dc..185cc10e5ff 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseProjections.cpp @@ -154,6 +154,7 @@ struct AggregateProjectionCandidate ActionsDAGPtr analyzeAggregateProjection( const AggregateProjectionInfo & info, ActionsDAG & query_dag, + const ActionsDAG::Node * filter_node, const Names & keys, const AggregateDescriptions & aggregates) { @@ -169,7 +170,11 @@ ActionsDAGPtr analyzeAggregateProjection( for (const auto * output : info.before_aggregation->getOutputs()) proj_index.emplace(output->result_name, output); - key_nodes.reserve(keys.size()); + key_nodes.reserve(keys.size() + 1); + + if (filter_node) + key_nodes.push_back(filter_node); + for (const auto & key : keys) { auto it = index.find(key); @@ -270,8 +275,8 @@ ActionsDAGPtr analyzeAggregateProjection( if (args.size() < aggregate.argument_names.size()) continue; - for (const auto * node : args) - split_nodes.insert(node); + // for (const auto * node : args) + // split_nodes.insert(node); match = AggFuncMatch{idx, std::move(args)}; } @@ -302,14 +307,8 @@ ActionsDAGPtr analyzeAggregateProjection( }; std::stack stack; - for (const auto & key : keys) + for (const auto * key_node : key_nodes) { - auto it = index.find(key); - /// This should not happen ideally. - if (it == index.end()) - break; - - const auto * key_node = it->second; if (visited.contains(key_node)) continue; @@ -343,8 +342,11 @@ ActionsDAGPtr analyzeAggregateProjection( } /// Not a match and there is no matched child. - if (frame.node->children.empty()) + if (frame.node->type == ActionsDAG::ActionType::INPUT) + { + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name); return {}; + } /// Not a match, but all children matched. visited.insert(frame.node); @@ -356,7 +358,13 @@ ActionsDAGPtr analyzeAggregateProjection( for (const auto * node : split_nodes) new_inputs[node] = matches[node].node->result_name; - return query_dag.foldActionsByProjection(new_inputs); + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Folding actions by projection"); + auto proj_dag = query_dag.foldActionsByProjection(new_inputs, key_nodes); + auto & proj_dag_outputs = proj_dag->getOutputs(); + for (const auto & aggregate : aggregates) + proj_dag_outputs.push_back(&proj_dag->addInput(aggregate.column_name, aggregate.function->getResultType())); + + return proj_dag; } void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) @@ -418,7 +426,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {}); } - dag->getOutputs().insert(dag->getOutputs().begin(), filter_node); + dag->getOutputs().push_back(filter_node); } ContextPtr context = reading->getContext(); @@ -434,7 +442,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name); auto info = getAggregatingProjectionInfo(*projection, context, metadata); LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG()); - if (auto proj_dag = analyzeAggregateProjection(info, *dag, keys, aggregates)) + if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates)) { LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG()); candidates.emplace_back(AggregateProjectionCandidate{ @@ -532,11 +540,16 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data); proj_snapshot->addProjection(best_candidate->projection); + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString()); + + auto query_info_copy = query_info; + query_info_copy.prewhere_info = nullptr; + auto projection_reading = reader.readFromParts( {}, best_candidate->dag->getRequiredColumnsNames(), proj_snapshot, - query_info, + query_info_copy, context, reading->getMaxBlockSize(), reading->getNumStreams(), @@ -544,6 +557,8 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) best_candidate->merge_tree_projection_select_result_ptr, reading->isParallelReadingEnabled()); + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure()); + projection_reading->setStepDescription(best_candidate->projection->name); auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)}); @@ -568,7 +583,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes) { /// All parts are taken from projection - + LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Expr stream {}", expr_or_filter_node.step->getOutputStream().header.dumpStructure()); aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream()); node.children.front() = &expr_or_filter_node;