Refactor a bit.

This commit is contained in:
Nikolai Kochetov 2023-02-27 16:59:32 +00:00
parent 9bf828cc98
commit 1e0ea2446e

View File

@ -22,21 +22,6 @@
namespace DB::QueryPlanOptimizations
{
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
return &node;
if (node.children.size() != 1)
return nullptr;
if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step))
return findReadingStep(*node.children.front());
return nullptr;
}
/// This is a common DAG which is a merge of DAGs from Filter and Expression steps chain.
/// Additionally, for all the Filter steps, we collect filter conditions into filter_nodes.
/// Flag remove_last_filter_node is set in case if the last step is a Filter step and it should remove filter column.
@ -124,28 +109,109 @@ bool QueryDAG::build(QueryPlan::Node & node)
return false;
}
static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
struct AggregateQueryDAG
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
return false;
ActionsDAGPtr dag;
const ActionsDAG::Node * filter_node = nullptr;
if (reading->isQueryWithFinal())
return false;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
if (reading->isQueryWithSampling())
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
if (reading->isParallelReadingEnabled())
return false;
if (!filter_nodes.empty())
{
filter_node = filter_nodes.front();
if (filter_nodes.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
// Currently projection don't support deduplication when moving parts between shards.
if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
return false;
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
return true;
}
dag->getOutputs().push_back(filter_node);
}
return true;
}
};
struct NormalQueryDAG
{
ActionsDAGPtr dag;
bool need_remove_column = false;
const ActionsDAG::Node * filter_node = nullptr;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
need_remove_column = query.remove_last_filter_node;
if (!filter_nodes.empty())
{
auto & outputs = dag->getOutputs();
filter_node = filter_nodes.back();
if (filter_nodes.size() > 1)
{
/// Add a conjunction of all the filters.
if (need_remove_column)
{
/// Last filter column is not needed; remove it right here
size_t pos = 0;
while (pos < outputs.size() && outputs[pos] != filter_node)
++pos;
if (pos < outputs.size())
outputs.erase(outputs.begin() + pos);
}
else
{
/// Last filter is needed; we must replace it to constant 1,
/// As well as FilterStep does to make a compatible header.
for (auto & output : outputs)
{
if (output == filter_node)
{
ColumnWithTypeAndName col;
col.name = filter_node->result_name;
col.type = filter_node->result_type;
col.column = col.type->createColumnConst(1, 1);
output = &dag->addColumn(std::move(col));
}
}
}
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
outputs.insert(outputs.begin(), filter_node);
need_remove_column = true;
}
}
if (dag)
{
dag->removeUnusedActions();
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Header {}, Query DAG: {}", header.dumpStructure(), dag->dumpDAG());
}
return true;
}
};
/// Required analysis info from aggregate projection.
struct AggregateProjectionInfo
@ -380,110 +446,6 @@ bool areAggregatesMatch(
return true;
}
struct AggregateQueryDAG
{
ActionsDAGPtr dag;
const ActionsDAG::Node * filter_node = nullptr;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
if (!filter_nodes.empty())
{
filter_node = filter_nodes.front();
if (filter_nodes.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
dag->getOutputs().push_back(filter_node);
}
return true;
}
};
struct NormalQueryDAG
{
ActionsDAGPtr dag;
bool need_remove_column = false;
const ActionsDAG::Node * filter_node = nullptr;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
need_remove_column = query.remove_last_filter_node;
if (!filter_nodes.empty())
{
auto & outputs = dag->getOutputs();
filter_node = filter_nodes.back();
if (filter_nodes.size() > 1)
{
/// Add a conjunction of all the filters.
if (need_remove_column)
{
/// Last filter column is not needed; remove it right here
size_t pos = 0;
while (pos < outputs.size() && outputs[pos] != filter_node)
++pos;
if (pos < outputs.size())
outputs.erase(outputs.begin() + pos);
}
else
{
/// Last filter is needed; we must replace it to constant 1,
/// As well as FilterStep does to make a compatible header.
for (auto & output : outputs)
{
if (output == filter_node)
{
ColumnWithTypeAndName col;
col.name = filter_node->result_name;
col.type = filter_node->result_type;
col.column = col.type->createColumnConst(1, 1);
output = &dag->addColumn(std::move(col));
}
}
}
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
outputs.insert(outputs.begin(), filter_node);
need_remove_column = true;
}
}
if (dag)
{
dag->removeUnusedActions();
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Header {}, Query DAG: {}", header.dumpStructure(), dag->dumpDAG());
}
return true;
}
};
ActionsDAGPtr analyzeAggregateProjection(
const AggregateProjectionInfo & info,
const AggregateQueryDAG & query,
@ -797,6 +759,43 @@ static bool analyzeProjectionCandidate(
return true;
}
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
return &node;
if (node.children.size() != 1)
return nullptr;
if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step))
return findReadingStep(*node.children.front());
return nullptr;
}
static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
return false;
if (reading->isQueryWithFinal())
return false;
if (reading->isQueryWithSampling())
return false;
if (reading->isParallelReadingEnabled())
return false;
// Currently projection don't support deduplication when moving parts between shards.
if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
return false;
return true;
}
bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)