Fix: remove redundant distinct with window functions

This commit is contained in:
Igor Nikonov 2024-05-14 14:27:36 +00:00
parent ebd0865fb0
commit a7898c85b8
3 changed files with 83 additions and 38 deletions

View File

@ -64,34 +64,59 @@ namespace
return non_const_columns;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool compareAggregationKeysWithDistinctColumns(
const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector<std::vector<ActionsDAGPtr>> actions_chain)
{
logDebug("aggregation_keys", aggregation_keys);
logDebug("aggregation_keys size", aggregation_keys.size());
logDebug("distinct_columns size", distinct_columns.size());
std::set<std::string_view> original_distinct_columns;
FindOriginalNodeForOutputName original_node_finder(path_actions);
for (const auto & column : distinct_columns)
std::set<String> current_columns(begin(distinct_columns), end(distinct_columns));
std::set<String> source_columns;
for (auto & actions : actions_chain)
{
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
FindOriginalNodeForOutputName original_node_finder(buildActionsForPlanPath(actions));
for (const auto & column : current_columns)
{
logDebug("original name for alias is not found", column);
original_distinct_columns.insert(column);
}
else
{
logDebug("alias result name", alias_node->result_name);
original_distinct_columns.insert(alias_node->result_name);
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
{
logDebug("original name for alias is not found", column);
source_columns.insert(String(column));
}
else
{
logDebug("alias result name", alias_node->result_name);
source_columns.insert(alias_node->result_name);
}
}
current_columns = std::move(source_columns);
source_columns.clear();
}
/// if aggregation keys are part of distinct columns then rows already distinct
for (const auto & key : aggregation_keys)
{
if (!original_distinct_columns.contains(key))
if (!current_columns.contains(key))
{
logDebug("aggregation key NOT found: {}", key);
return false;
@ -122,30 +147,13 @@ namespace
return false;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool passTillAggregation(const QueryPlan::Node * distinct_node)
{
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
chassert(distinct_step);
std::vector<ActionsDAGPtr> dag_stack;
std::vector<std::vector<ActionsDAGPtr>> actions_chain;
const DistinctStep * inner_distinct_step = nullptr;
const IQueryPlanStep * aggregation_before_distinct = nullptr;
const QueryPlan::Node * node = distinct_node;
@ -163,6 +171,12 @@ namespace
break;
}
if (typeid_cast<const WindowStep *>(current_step))
{
actions_chain.push_back(std::move(dag_stack));
dag_stack.clear();
}
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
@ -177,16 +191,19 @@ namespace
if (aggregation_before_distinct)
{
ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
logActionsDAG("aggregation pass: merged DAG", actions);
const auto distinct_columns = getDistinctColumns(distinct_step);
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
aggregating_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
merging_aggregated_step)
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
merging_aggregated_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
}
return false;

View File

@ -0,0 +1,7 @@
1
2
3
--------
1 2024-05-14 00:00:00
2 2024-05-14 00:00:00
3 2024-05-14 00:00:00

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS tab;
DROP TABLE IF EXISTS tab_v;
CREATE TABLE tab (id Int32, val Nullable(Float64), dt Nullable(DateTime64(6)), type Nullable(Int32)) ENGINE = MergeTree ORDER BY id;
insert into tab values (1,10,now(),1),(2,20,now()-1,1),(3,20,now()-2,2),(4,40,now()-3,3),(5,50,now()-4,3);
CREATE VIEW tab_v AS SELECT
t1.type AS type,
sum(t1.val) AS sval,
toStartOfDay(t1.dt) AS sday,
anyLast(sval) OVER w AS lval
FROM tab AS t1
GROUP BY
type,
sday
WINDOW w AS (PARTITION BY type);
select distinct type from tab_v order by type;
select '--------';
select distinct type, sday from tab_v order by type, sday;