This commit is contained in:
Nikita Taranov 2023-02-04 16:14:53 +00:00
parent 4fbbca075b
commit 5183cc6ee5

View File

@ -136,11 +136,11 @@ bool allOutputsDependsOnlyOnAllowedNodes(
/// No need to explicitly check that each function is deterministic, because it is a guaranteed property of partition key expression (checked on table creation).
/// So it is left only to check that each output node depends only on the allowed set of nodes (`irreducible_nodes`).
bool allOutputsDependsOnlyOnAllowedNodes(
const ActionsDAGPtr & partition_actions, const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches)
const ActionsDAG & partition_actions, const NodeSet & irreducible_nodes, const MatchedTrees::Matches & matches)
{
NodeMap visited;
bool res = true;
for (const auto & node : partition_actions->getOutputs())
for (const auto & node : partition_actions.getOutputs())
if (node->type != ActionsDAG::ActionType::INPUT)
res &= allOutputsDependsOnlyOnAllowedNodes(irreducible_nodes, matches, node, visited);
return res;
@ -151,27 +151,26 @@ bool allOutputsDependsOnlyOnAllowedNodes(
/// 2. To find col1, ..., coln we apply removeInjectiveFunctionsFromResultsRecursively to group by key actions.
/// 3. We match partition key actions with group by key actions to find col1', ..., coln' in partition key actions.
/// 4. We check that partition key is indeed a deterministic function of col1', ..., coln'.
bool isPartitionKeySuitsGroupByKey(const ReadFromMergeTree & reading, ActionsDAGPtr group_by_actions, const AggregatingStep & aggregating)
bool isPartitionKeySuitsGroupByKey(
const ReadFromMergeTree & reading, const ActionsDAGPtr & group_by_actions, const AggregatingStep & aggregating)
{
if (aggregating.isGroupingSets())
return false;
/// We are interested only in calculations required to obtain group by keys.
group_by_actions->removeUnusedActions(aggregating.getParams().keys);
if (group_by_actions->hasArrayJoin() || group_by_actions->hasStatefulFunctions() || group_by_actions->hasNonDeterministic())
return false;
const auto & gb_key_required_columns = group_by_actions->getRequiredColumnsNames();
const auto partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG().clone();
const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
/// Check that PK columns is a subset of GBK columns.
for (const auto & col : partition_actions->getRequiredColumnsNames())
for (const auto & col : partition_actions.getRequiredColumnsNames())
if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
return false;
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_actions);
const auto matches = matchTrees(*group_by_actions, *partition_actions);
const auto matches = matchTrees(*group_by_actions, partition_actions);
return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
}
@ -209,7 +208,7 @@ size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::No
return 0;
if (!reading->willOutputEachPartitionThroughSeparatePort()
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression()->clone(), *aggregating_step))
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step))
{
if (reading->requestOutputEachPartitionThroughSeparatePort())
aggregating_step->skipMerging();