Aggregate Projections analysis using query plan [In progress]

This commit is contained in:
Nikolai Kochetov 2023-01-27 18:38:14 +00:00
parent 134ac9b2dd
commit 286a58801e
3 changed files with 81 additions and 3 deletions

View File

@ -723,6 +723,79 @@ NameSet ActionsDAG::foldActionsByProjection(
return next_required_columns;
}
ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs)
{
auto dag = std::make_unique<ActionsDAG>();
std::unordered_map<const Node *, size_t> new_input_to_pos;
std::unordered_map<const Node *, const Node *> mapping;
struct Frame
{
const Node * node;
size_t next_child = 0;
};
std::vector<Frame> stack;
for (const auto * output : outputs)
{
if (mapping.contains(output))
continue;
stack.push_back({.node = output});
while (!stack.empty())
{
auto & frame = stack.back();
if (frame.next_child == 0)
{
auto it = new_inputs.find(frame.node);
if (it != new_inputs.end())
{
const auto & [new_input, rename] = *it;
const auto * node = &dag->addInput(new_input->result_name, new_input->result_type);
if (!rename.empty() && new_input->result_name != rename)
node = &dag->addAlias(*node, rename);
mapping.emplace(frame.node, node);
stack.pop_back();
continue;
}
}
const auto & children = frame.node->children;
while (frame.next_child < children.size() && !mapping.emplace(children[frame.next_child], nullptr).second)
++frame.next_child;
if (frame.next_child < children.size())
{
const auto * child = children[frame.next_child];
++frame.next_child;
stack.push_back({.node = child});
continue;
}
if (frame.node->type == ActionType::INPUT)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot fold actions for projection. Node {} requires input {} which does not belong to projection",
stack.front().node->result_name, frame.node->result_name);
auto & node = dag->nodes.emplace_back(*frame.node);
for (auto & child : node.children)
child = mapping[child];
stack.pop_back();
}
}
for (const auto * output : outputs)
dag->outputs.push_back(mapping[output]);
return dag;
}
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
{
::sort(outputs.begin(), outputs.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)

View File

@ -214,6 +214,8 @@ public:
const String & predicate_column_name = {},
bool add_missing_keys = true);
ActionsDAGPtr foldActionsByProjection(const std::unordered_map<const Node *, std::string> & new_inputs);
/// Reorder the output nodes using given position mapping.
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);

View File

@ -148,9 +148,10 @@ struct AggregateProjectionCandidate
{
AggregateProjectionInfo info;
ProjectionDescription * projection;
ActionsDAGPtr dag;
};
std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
ActionsDAGPtr analyzeAggregateProjection(
//ProjectionDescription & projection,
AggregateProjectionInfo info,
ActionsDAG & query_dag,
@ -352,9 +353,11 @@ std::optional<AggregateProjectionCandidate> analyzeAggregateProjection(
}
}
std::unordered_map<const ActionsDAG::Node *, std::string> new_inputs;
for (const auto * node : split_nodes)
new_inputs[node] = matches[node].node->result_name;
return {};
return query_dag.foldActionsByProjection(new_inputs);
}
void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)