Proper process aliases for aggregation-by-partition optimization.

This commit is contained in:
Nikolai Kochetov 2023-11-02 11:19:37 +00:00
parent 7d4c97e8f3
commit 5b6bf58730
3 changed files with 91 additions and 23 deletions

View File

@ -335,6 +335,28 @@ const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name)
return nullptr;
}
ActionsDAG::NodeRawConstPtrs ActionsDAG::findInOutpus(const Names & names) const
{
NodeRawConstPtrs required_nodes;
required_nodes.reserve(names.size());
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : outputs)
names_map[node->result_name] = node;
for (const auto & name : names)
{
auto it = names_map.find(name);
if (it == names_map.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", name, dumpDAG());
required_nodes.push_back(it->second);
}
return required_nodes;
}
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
{
for (auto & output_node : outputs)
@ -441,23 +463,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_remove_inputs, bool allow_constant_folding)
{
NodeRawConstPtrs required_nodes;
required_nodes.reserve(required_names.size());
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : outputs)
names_map[node->result_name] = node;
for (const auto & name : required_names)
{
auto it = names_map.find(name);
if (it == names_map.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", name, dumpDAG());
required_nodes.push_back(it->second);
}
auto required_nodes = findInOutpus(required_names);
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
}
@ -528,6 +534,62 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
}
ActionsDAGPtr ActionsDAG::cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases)
{
auto actions = std::make_shared<ActionsDAG>();
std::unordered_map<const Node *, Node *> copy_map;
struct Frame
{
const Node * node = nullptr;
size_t next_child = 0;
};
std::stack<Frame> stack;
for (const auto * output : outputs)
{
if (copy_map.contains(output))
continue;
stack.push(Frame{output});
while (!stack.empty())
{
auto & frame = stack.top();
const auto & children = frame.node->children;
while (frame.next_child < children.size() && copy_map.contains(children[frame.next_child]))
++frame.next_child;
if (frame.next_child < children.size())
{
stack.push(Frame{children[frame.next_child]});
continue;
}
auto & copy_node = copy_map[frame.node];
if (remove_aliases && frame.node->type == ActionType::ALIAS)
copy_node = copy_map[frame.node->children.front()];
else
copy_node = &actions->nodes.emplace_back(*frame.node);
if (frame.node->type == ActionType::INPUT)
actions->inputs.push_back(copy_node);
stack.pop();
}
}
for (auto & node : actions->nodes)
for (auto & child : node.children)
child = copy_map[child];
for (const auto * output : outputs)
actions->outputs.push_back(copy_map[output]);
return actions;
}
static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments)
{
ColumnWithTypeAndName res_column;

View File

@ -157,6 +157,9 @@ public:
/// Same, but return nullptr if node not found.
const Node * tryFindInOutputs(const std::string & name) const;
/// Same, but for the list of names.
NodeRawConstPtrs findInOutpus(const Names & names) const;
/// Find first node with the same name in output nodes and replace it.
/// If was not found, add node to outputs end.
void addOrReplaceInOutputs(const Node & node);
@ -257,6 +260,8 @@ public:
ActionsDAGPtr clone() const;
static ActionsDAGPtr cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases);
/// Execute actions for header. Input block must have empty columns.
/// Result should be equal to the execution of ExpressionActions built from this DAG.
/// Actions are not changed, no expressions are compiled.

View File

@ -95,7 +95,7 @@ bool allOutputsDependsOnlyOnAllowedNodes(
{
const auto & match = matches.at(node);
/// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees)
if (match.node && match.node->result_name == node->result_name && !match.monotonicity)
if (match.node && !match.monotonicity)
res = irreducible_nodes.contains(match.node);
}
@ -155,9 +155,10 @@ bool isPartitionKeySuitsGroupByKey(
return false;
/// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example).
group_by_actions->removeUnusedActions(aggregating.getParams().keys);
auto key_nodes = group_by_actions->findInOutpus(aggregating.getParams().keys);
auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, true);
const auto & gb_key_required_columns = group_by_actions->getRequiredColumnsNames();
const auto & gb_key_required_columns = group_by_key_actions->getRequiredColumnsNames();
const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
@ -166,9 +167,9 @@ bool isPartitionKeySuitsGroupByKey(
if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
return false;
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_actions);
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_key_actions);
const auto matches = matchTrees(group_by_actions->getOutputs(), partition_actions);
const auto matches = matchTrees(group_by_key_actions->getOutputs(), partition_actions);
return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
}
@ -206,7 +207,7 @@ size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::No
return 0;
if (!reading->willOutputEachPartitionThroughSeparatePort()
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression()->clone(), *aggregating_step))
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step))
{
if (reading->requestOutputEachPartitionThroughSeparatePort())
aggregating_step->skipMerging();