mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
Aggregate Projections analysis using query plan [In progress]
This commit is contained in:
parent
9c6c6d9844
commit
134ac9b2dd
@ -6,6 +6,7 @@
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/ReadFromMergeTree.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <stack>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
@ -150,7 +151,7 @@ struct AggregateProjectionCandidate
|
||||
};
|
||||
|
||||
std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
|
||||
ProjectionDescription & projection,
|
||||
//ProjectionDescription & projection,
|
||||
AggregateProjectionInfo info,
|
||||
ActionsDAG & query_dag,
|
||||
const Names & keys,
|
||||
@ -164,6 +165,10 @@ std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
|
||||
for (const auto * output : query_dag.getOutputs())
|
||||
index.emplace(output->result_name, output);
|
||||
|
||||
std::unordered_map<std::string, const ActionsDAG::Node *> proj_index;
|
||||
for (const auto * output : info.before_aggregation->getOutputs())
|
||||
proj_index.emplace(output->result_name, output);
|
||||
|
||||
key_nodes.reserve(keys.size());
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
@ -194,6 +199,8 @@ std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
|
||||
for (size_t i = 0; i < info.aggregates.size(); ++i)
|
||||
projection_aggregate_functions[info.aggregates[i].function->getName()].push_back(i);
|
||||
|
||||
std::unordered_set<const ActionsDAG::Node *> split_nodes;
|
||||
|
||||
struct AggFuncMatch
|
||||
{
|
||||
/// idx in projection
|
||||
@ -228,29 +235,44 @@ std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
|
||||
continue;
|
||||
|
||||
ActionsDAG::NodeRawConstPtrs args;
|
||||
args.reserve(aggregate.argument_names.size());
|
||||
for (const auto & name : aggregate.argument_names)
|
||||
size_t num_args = aggregate.argument_names.size();
|
||||
args.reserve(num_args);
|
||||
for (size_t arg = 0; arg < num_args; ++arg)
|
||||
{
|
||||
auto jt = index.find(name);
|
||||
const auto & query_name = aggregate.argument_names[arg];
|
||||
const auto & proj_name = candidate.argument_names[arg];
|
||||
|
||||
auto jt = index.find(query_name);
|
||||
/// This should not happen ideally.
|
||||
if (jt == index.end())
|
||||
break;
|
||||
|
||||
const auto * outer_node = jt->second;
|
||||
auto kt = matches.find(outer_node);
|
||||
if (kt == matches.end())
|
||||
const auto * query_node = jt->second;
|
||||
|
||||
auto kt = proj_index.find(proj_name);
|
||||
/// This should not happen ideally.
|
||||
if (kt == proj_index.end())
|
||||
break;
|
||||
|
||||
const auto & node_match = kt->second;
|
||||
if (!node_match.node || node_match.monotonicity)
|
||||
const auto * proj_node = kt->second;
|
||||
|
||||
auto mt = matches.find(query_node);
|
||||
if (mt == matches.end())
|
||||
break;
|
||||
|
||||
args.push_back(node_match.node);
|
||||
const auto & node_match = mt->second;
|
||||
if (node_match.node != proj_node || node_match.monotonicity)
|
||||
break;
|
||||
|
||||
args.push_back(query_node);
|
||||
}
|
||||
|
||||
if (args.size() < aggregate.argument_names.size())
|
||||
continue;
|
||||
|
||||
for (const auto * node : args)
|
||||
split_nodes.insert(node);
|
||||
|
||||
match = AggFuncMatch{idx, std::move(args)};
|
||||
}
|
||||
|
||||
@ -260,7 +282,79 @@ std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
|
||||
aggregate_function_matches.emplace_back(std::move(*match));
|
||||
}
|
||||
|
||||
std::unordered_set<const ActionsDAG::Node *> proj_key_nodes;
|
||||
for (const auto & key : info.keys)
|
||||
{
|
||||
auto it = proj_index.find(key.name);
|
||||
/// This should not happen ideally.
|
||||
if (it == proj_index.end())
|
||||
break;
|
||||
|
||||
proj_key_nodes.insert(it->second);
|
||||
}
|
||||
|
||||
std::unordered_set<const ActionsDAG::Node *> visited;
|
||||
|
||||
struct Frame
|
||||
{
|
||||
const ActionsDAG::Node * node;
|
||||
size_t next_child_to_visit = 0;
|
||||
};
|
||||
|
||||
std::stack<Frame> stack;
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
auto it = index.find(key);
|
||||
/// This should not happen ideally.
|
||||
if (it == index.end())
|
||||
break;
|
||||
|
||||
const auto * key_node = it->second;
|
||||
if (visited.contains(key_node))
|
||||
continue;
|
||||
|
||||
stack.push({.node = key_node});
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
auto & frame = stack.top();
|
||||
|
||||
if (frame.next_child_to_visit == 0)
|
||||
{
|
||||
auto jt = matches.find(frame.node);
|
||||
if (jt != matches.end())
|
||||
{
|
||||
auto & match = jt->second;
|
||||
if (match.node && !match.monotonicity && proj_key_nodes.contains(match.node))
|
||||
{
|
||||
visited.insert(frame.node);
|
||||
split_nodes.insert(frame.node);
|
||||
stack.pop();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (frame.next_child_to_visit < frame.node->children.size())
|
||||
{
|
||||
stack.push({.node = frame.node->children[frame.next_child_to_visit]});
|
||||
++frame.next_child_to_visit;
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Not a match and there is no matched child.
|
||||
if (frame.node->children.empty())
|
||||
return {};
|
||||
|
||||
/// Not a match, but all children matched.
|
||||
visited.insert(frame.node);
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)
|
||||
@ -281,8 +375,8 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)
|
||||
if (!buildAggregatingDAG(node, dag, filter_nodes))
|
||||
return;
|
||||
|
||||
const auto & keys = aggregating->getParams().keys;
|
||||
const auto & aggregates = aggregating->getParams().aggregates;
|
||||
// const auto & keys = aggregating->getParams().keys;
|
||||
// const auto & aggregates = aggregating->getParams().aggregates;
|
||||
|
||||
auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get());
|
||||
if (!reading)
|
||||
|
Loading…
Reference in New Issue
Block a user