Make 01710_aggregate_projections work.

This commit is contained in:
Nikolai Kochetov 2023-02-06 18:42:58 +00:00
parent 310a4c69af
commit a542626fa3
4 changed files with 46 additions and 25 deletions

View File

@ -724,7 +724,7 @@ NameSet ActionsDAG::foldActionsByProjection(
}
ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs)
ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs, const NodeRawConstPtrs & required_outputs)
{
auto dag = std::make_unique<ActionsDAG>();
std::unordered_map<const Node *, size_t> new_input_to_pos;
@ -737,7 +737,7 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const
};
std::vector<Frame> stack;
for (const auto * output : outputs)
for (const auto * output : required_outputs)
{
if (mapping.contains(output))
continue;
@ -754,11 +754,15 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const
{
const auto & [new_input, rename] = *it;
const auto * node = &dag->addInput(new_input->result_name, new_input->result_type);
auto & node = mapping[frame.node];
if (!node)
{
node = &dag->addInput(new_input->result_name, new_input->result_type);
if (!rename.empty() && new_input->result_name != rename)
node = &dag->addAlias(*node, rename);
}
mapping.emplace(frame.node, node);
stack.pop_back();
continue;
}
@ -786,11 +790,12 @@ ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const
for (auto & child : node.children)
child = mapping[child];
mapping[frame.node] = &node;
stack.pop_back();
}
}
for (const auto * output : outputs)
for (const auto * output : required_outputs)
dag->outputs.push_back(mapping[output]);
return dag;

View File

@ -214,7 +214,7 @@ public:
const String & predicate_column_name = {},
bool add_missing_keys = true);
ActionsDAGPtr foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs);
static ActionsDAGPtr foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs, const NodeRawConstPtrs & required_outputs);
/// Reorder the output nodes using given position mapping.
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);

View File

@ -467,7 +467,8 @@ void AggregatingStep::describePipeline(FormatSettings & settings) const
bool AggregatingStep::canUseProjection() const
{
return grouping_sets_params.empty() && sort_description_for_merging.empty() && !memory_bound_merging_of_aggregation_results_enabled;
//LOG_TRACE(&Poco::Logger::get("AggregatingStep"), "canUseProjection {} {} {}", grouping_sets_params.size(), sort_description_for_merging.size(), memory_bound_merging_of_aggregation_results_enabled);
return grouping_sets_params.empty() && sort_description_for_merging.empty(); // && !memory_bound_merging_of_aggregation_results_enabled;
}
void AggregatingStep::requestOnlyMergeForAggregateProjection(const DataStream & input_stream)

View File

@ -154,6 +154,7 @@ struct AggregateProjectionCandidate
ActionsDAGPtr analyzeAggregateProjection(
const AggregateProjectionInfo & info,
ActionsDAG & query_dag,
const ActionsDAG::Node * filter_node,
const Names & keys,
const AggregateDescriptions & aggregates)
{
@ -169,7 +170,11 @@ ActionsDAGPtr analyzeAggregateProjection(
for (const auto * output : info.before_aggregation->getOutputs())
proj_index.emplace(output->result_name, output);
key_nodes.reserve(keys.size());
key_nodes.reserve(keys.size() + 1);
if (filter_node)
key_nodes.push_back(filter_node);
for (const auto & key : keys)
{
auto it = index.find(key);
@ -270,8 +275,8 @@ ActionsDAGPtr analyzeAggregateProjection(
if (args.size() < aggregate.argument_names.size())
continue;
for (const auto * node : args)
split_nodes.insert(node);
// for (const auto * node : args)
// split_nodes.insert(node);
match = AggFuncMatch{idx, std::move(args)};
}
@ -302,14 +307,8 @@ ActionsDAGPtr analyzeAggregateProjection(
};
std::stack<Frame> stack;
for (const auto & key : keys)
for (const auto * key_node : key_nodes)
{
auto it = index.find(key);
/// This should not happen ideally.
if (it == index.end())
break;
const auto * key_node = it->second;
if (visited.contains(key_node))
continue;
@ -343,8 +342,11 @@ ActionsDAGPtr analyzeAggregateProjection(
}
/// Not a match and there is no matched child.
if (frame.node->children.empty())
if (frame.node->type == ActionsDAG::ActionType::INPUT)
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name);
return {};
}
/// Not a match, but all children matched.
visited.insert(frame.node);
@ -356,7 +358,13 @@ ActionsDAGPtr analyzeAggregateProjection(
for (const auto * node : split_nodes)
new_inputs[node] = matches[node].node->result_name;
return query_dag.foldActionsByProjection(new_inputs);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Folding actions by projection");
auto proj_dag = query_dag.foldActionsByProjection(new_inputs, key_nodes);
auto & proj_dag_outputs = proj_dag->getOutputs();
for (const auto & aggregate : aggregates)
proj_dag_outputs.push_back(&proj_dag->addInput(aggregate.column_name, aggregate.function->getResultType()));
return proj_dag;
}
void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
@ -418,7 +426,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
dag->getOutputs().insert(dag->getOutputs().begin(), filter_node);
dag->getOutputs().push_back(filter_node);
}
ContextPtr context = reading->getContext();
@ -434,7 +442,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name);
auto info = getAggregatingProjectionInfo(*projection, context, metadata);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
if (auto proj_dag = analyzeAggregateProjection(info, *dag, keys, aggregates))
if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates))
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
candidates.emplace_back(AggregateProjectionCandidate{
@ -532,11 +540,16 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data);
proj_snapshot->addProjection(best_candidate->projection);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
auto projection_reading = reader.readFromParts(
{},
best_candidate->dag->getRequiredColumnsNames(),
proj_snapshot,
query_info,
query_info_copy,
context,
reading->getMaxBlockSize(),
reading->getNumStreams(),
@ -544,6 +557,8 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
best_candidate->merge_tree_projection_select_result_ptr,
reading->isParallelReadingEnabled());
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure());
projection_reading->setStepDescription(best_candidate->projection->name);
auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
@ -568,7 +583,7 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
/// All parts are taken from projection
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Expr stream {}", expr_or_filter_node.step->getOutputStream().header.dumpStructure());
aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream());
node.children.front() = &expr_or_filter_node;