Removing DISTINCT on top of aggregation if possible

This commit is contained in:
Igor Nikonov 2023-02-09 11:37:15 +00:00
parent 5709c9832c
commit df23731061
5 changed files with 250 additions and 39 deletions

View File

@ -18,6 +18,7 @@ public:
bool optimize_distinct_in_order_);
String getName() const override { return "Distinct"; }
const Names & getColumnNames() const { return columns; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
@ -33,7 +34,7 @@ private:
SizeLimits set_size_limits;
UInt64 limit_hint;
Names columns;
const Names columns;
bool pre_distinct;
bool optimize_distinct_in_order;
};

View File

@ -1,14 +1,22 @@
#include <memory>
#include <Interpreters/ActionsDAG.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
@ -17,23 +25,29 @@ namespace DB::QueryPlanOptimizations
namespace
{
constexpr bool debug_logging_enabled = false;
constexpr bool debug_logging_enabled = true;
void logActionsDAG(const String & prefix, const ActionsDAGPtr & actions)
{
if constexpr (debug_logging_enabled)
{
LOG_DEBUG(&Poco::Logger::get("redundantDistinct"), "{}: {}", prefix, actions->dumpDAG());
}
}
std::set<std::string_view> getDistinctColumns(const DistinctStep * distinct)
void logDebug(String key, String value)
{
if constexpr (debug_logging_enabled)
LOG_DEBUG(&Poco::Logger::get("redundantDistinct"), "{} : {}", key, value);
}
using DistinctColumns = std::set<std::string_view>;
DistinctColumns getDistinctColumns(const DistinctStep * distinct)
{
/// find non-const columns in DISTINCT
const ColumnsWithTypeAndName & distinct_columns = distinct->getOutputStream().header.getColumnsWithTypeAndName();
std::set<std::string_view> non_const_columns;
const Names & column_names = distinct->getColumnNames();
for (const auto & column : distinct_columns)
{
if (!isColumnConst(*column.column))
if (!isColumnConst(*column.column) && find(cbegin(column_names), cend(column_names), column.name) != column_names.cend())
non_const_columns.emplace(column.name);
}
return non_const_columns;
@ -67,42 +81,138 @@ namespace
return node;
}
bool canRemoveDistinct(const QueryPlan::Node * distinct_node)
bool compareAggregationKeysWithDistinctColumns(
const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
{
if (aggregation_keys.size() != distinct_columns.size())
return false;
/// compare columns of two DISTINCTs
for (const auto & column : distinct_columns)
{
const auto * alias_node = getOriginalNodeForOutputAlias(path_actions, String(column));
if (!alias_node)
return false;
if (std::find(cbegin(aggregation_keys), cend(aggregation_keys), alias_node->result_name) == aggregation_keys.cend())
return false;
}
return true;
}
bool checkStepToAllowOptimization(const IQueryPlanStep * step)
{
if (typeid_cast<const DistinctStep *>(step))
return true;
if (const auto * const expr = typeid_cast<const ExpressionStep *>(step); expr)
return !expr->getExpression()->hasArrayJoin();
if (const auto * const filter = typeid_cast<const FilterStep *>(step); filter)
return !filter->getExpression()->hasArrayJoin();
if (typeid_cast<const LimitStep *>(step) || typeid_cast<const LimitByStep *>(step) || typeid_cast<const SortingStep *>(step)
|| typeid_cast<const WindowStep *>(step))
return true;
return false;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool passTillAggregation(const QueryPlan::Node * distinct_node)
{
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
chassert(distinct_step);
std::vector<ActionsDAGPtr> dag_stack;
const DistinctStep * inner_distinct_step = nullptr;
const IQueryPlanStep * aggregation_before_distinct = nullptr;
const QueryPlan::Node * node = distinct_node;
while (!node->children.empty())
{
const IQueryPlanStep * current_step = node->step.get();
if (typeid_cast<const AggregatingStep *>(current_step) || typeid_cast<const CubeStep *>(current_step)
|| typeid_cast<const RollupStep *>(current_step) || typeid_cast<const TotalsHavingStep *>(current_step))
{
aggregation_before_distinct = current_step;
break;
}
if (!checkStepToAllowOptimization(current_step))
{
logDebug("aggregation pass: stopped by allow check on step", current_step->getName());
break;
}
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
dag_stack.push_back(filter->getExpression());
node = node->children.front();
if (inner_distinct_step = typeid_cast<DistinctStep *>(node->step.get()); inner_distinct_step)
break;
}
if (inner_distinct_step)
return false;
if (aggregation_before_distinct)
{
ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
logActionsDAG("aggregation pass: merged DAG:\n{}", actions);
const auto distinct_columns = getDistinctColumns(distinct_step);
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
if (const auto * cube_step = typeid_cast<const CubeStep *>(aggregation_before_distinct); cube_step)
return compareAggregationKeysWithDistinctColumns(cube_step->getParams().keys, distinct_columns, actions);
if (const auto * rollup_step = typeid_cast<const RollupStep *>(aggregation_before_distinct); rollup_step)
return compareAggregationKeysWithDistinctColumns(rollup_step->getParams().keys, distinct_columns, actions);
}
return false;
}
bool passTillDistinct(const QueryPlan::Node * distinct_node)
{
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
chassert(distinct_step);
const auto distinct_columns = getDistinctColumns(distinct_step);
std::vector<ActionsDAGPtr> dag_stack;
const DistinctStep * inner_distinct_step = nullptr;
const QueryPlan::Node * node = distinct_node;
while (!node->children.empty())
{
const IQueryPlanStep * current_step = node->step.get();
/// don't try to remove DISTINCT after step with many inputs, like union/join/intersect/except
if (current_step->getInputStreams().size() > 1)
break;
/// do not remove DISTINCT if there are steps which can generate new rows
if (typeid_cast<const ArrayJoinStep *>(current_step) || typeid_cast<const FillingStep *>(current_step))
if (!checkStepToAllowOptimization(current_step))
{
logDebug("distinct pass: stopped by allow check on step", current_step->getName());
break;
}
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
{
/// arrayJoin() can generate new rows
if (expr->getExpression()->hasArrayJoin())
break;
dag_stack.push_back(expr->getExpression());
}
if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
{
/// arrayJoin() can generate new rows
if (filter->getExpression()->hasArrayJoin())
break;
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
dag_stack.push_back(filter->getExpression());
}
node = node->children.front();
inner_distinct_step = typeid_cast<DistinctStep *>(node->step.get());
@ -120,7 +230,6 @@ namespace
if (inner_distinct_step->isPreliminary())
return false;
const auto distinct_columns = getDistinctColumns(distinct_step);
auto inner_distinct_columns = getDistinctColumns(inner_distinct_step);
if (distinct_columns.size() != inner_distinct_columns.size())
return false;
@ -129,16 +238,8 @@ namespace
if (!dag_stack.empty())
{
/// build actions DAG to find original column names
path_actions = dag_stack.back();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
logActionsDAG("merged DAG:\n{}", path_actions);
path_actions = buildActionsForPlanPath(dag_stack);
logActionsDAG("distinct pass: merged DAG:\n{}", path_actions);
/// compare columns of two DISTINCTs
for (const auto & column : distinct_columns)
@ -162,6 +263,17 @@ namespace
return true;
}
bool canRemoveDistinct(const QueryPlan::Node * distinct_node)
{
if (passTillAggregation(distinct_node))
return true;
if (passTillDistinct(distinct_node))
return true;
return false;
}
}
///
@ -188,5 +300,4 @@ size_t tryRemoveRedundantDistinct(QueryPlan::Node * parent_node, QueryPlan::Node
return applied;
}
}

View File

@ -19,6 +19,8 @@ public:
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
const Aggregator::Params & getParams() const { return params; }
private:
void updateOutputStream() override;

View File

@ -208,3 +208,64 @@ Expression (Project names)
ReadFromStorage (SystemOne)
-- execute
['Istanbul','Berlin','Bensheim']
-- GROUP BY before DISTINCT with on the same columns => remove DISTINCT
-- query
SELECT DISTINCT a
FROM
(
SELECT
a,
sum(b) AS c
FROM
(
SELECT
x.number AS a,
y.number AS b
FROM numbers(3) AS x, numbers(3, 3) AS y
)
GROUP BY a
)
-- explain
Expression ((Project names + (Projection + (Change column names to column identifiers + (Project names + Projection)))))
Aggregating
Expression ((Before GROUP BY + (Change column names to column identifiers + (Project names + (Projection + DROP unused columns after JOIN)))))
Join (JOIN FillRightFirst)
Expression (Change column names to column identifiers)
ReadFromStorage (SystemNumbers)
Expression (Change column names to column identifiers)
ReadFromStorage (SystemNumbers)
-- execute
0
2
1
-- GROUP BY before DISTINCT with on different columns => do _not_ remove DISTINCT
-- query
SELECT DISTINCT c
FROM
(
SELECT
a,
sum(b) AS c
FROM
(
SELECT
x.number AS a,
y.number AS b
FROM numbers(3) AS x, numbers(3, 3) AS y
)
GROUP BY a
)
-- explain
Expression (Project names)
Distinct (DISTINCT)
Distinct (Preliminary DISTINCT)
Expression ((Projection + (Change column names to column identifiers + (Project names + Projection))))
Aggregating
Expression ((Before GROUP BY + (Change column names to column identifiers + (Project names + (Projection + DROP unused columns after JOIN)))))
Join (JOIN FillRightFirst)
Expression (Change column names to column identifiers)
ReadFromStorage (SystemNumbers)
Expression (Change column names to column identifiers)
ReadFromStorage (SystemNumbers)
-- execute
12

View File

@ -112,3 +112,39 @@ FROM
)
WHERE arrayJoin(cities) IN ['Berlin', 'Bensheim']"
run_query "$query"
echo "-- GROUP BY before DISTINCT with on the same columns => remove DISTINCT"
query="SELECT DISTINCT a
FROM
(
SELECT
a,
sum(b) AS c
FROM
(
SELECT
x.number AS a,
y.number AS b
FROM numbers(3) AS x, numbers(3, 3) AS y
)
GROUP BY a
)"
run_query "$query"
echo "-- GROUP BY before DISTINCT with on different columns => do _not_ remove DISTINCT"
query="SELECT DISTINCT c
FROM
(
SELECT
a,
sum(b) AS c
FROM
(
SELECT
x.number AS a,
y.number AS b
FROM numbers(3) AS x, numbers(3, 3) AS y
)
GROUP BY a
)"
run_query "$query"