Refactor a bit more.

This commit is contained in:
Nikolai Kochetov 2023-02-24 18:39:27 +00:00
parent c63bbbb374
commit 030a0ba7fb

View File

@ -22,6 +22,131 @@
namespace DB::QueryPlanOptimizations
{
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
return &node;
if (node.children.size() != 1)
return nullptr;
if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step))
return findReadingStep(*node.children.front());
return nullptr;
}
/// This is a common DAG which is a merge of DAGs from Filter and Expression steps chain.
/// Additionally, for all the Filter steps, we collect filter conditions into filter_nodes.
/// Flag remove_last_filter_node is set in case if the last step is a Filter step and it should remove filter column.
struct QueryDAG
{
ActionsDAGPtr dag;
ActionsDAG::NodeRawConstPtrs filter_nodes;
bool remove_last_filter_node = false;
bool build(QueryPlan::Node & node);
private:
void appendExpression(const ActionsDAGPtr & expression)
{
if (dag)
dag->mergeInplace(std::move(*expression->clone()));
else
dag = expression->clone();
}
};
bool QueryDAG::build(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
if (const auto * prewhere_info = reading->getPrewhereInfo())
{
if (prewhere_info->row_level_filter)
{
remove_last_filter_node = false;
appendExpression(prewhere_info->row_level_filter);
if (const auto * filter_node = dag->tryFindInOutputs(prewhere_info->row_level_column_name))
filter_nodes.push_back(filter_node);
else
return false;
}
if (prewhere_info->prewhere_actions)
{
remove_last_filter_node = prewhere_info->remove_prewhere_column;
appendExpression(prewhere_info->prewhere_actions);
if (const auto * filter_node = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
filter_nodes.push_back(filter_node);
else
return false;
}
}
return true;
}
if (node.children.size() != 1)
return false;
if (!build(*node.children.front()))
return false;
if (auto * expression = typeid_cast<ExpressionStep *>(step))
{
const auto & actions = expression->getExpression();
if (actions->hasArrayJoin())
return false;
appendExpression(actions);
remove_last_filter_node = false;
return true;
}
if (auto * filter = typeid_cast<FilterStep *>(step))
{
const auto & actions = filter->getExpression();
if (actions->hasArrayJoin())
return false;
appendExpression(actions);
remove_last_filter_node = filter->removesFilterColumn();
const auto * filter_expression = dag->tryFindInOutputs(filter->getFilterColumnName());
if (!filter_expression)
return false;
filter_nodes.push_back(filter_expression);
return true;
}
return false;
}
bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
return false;
if (reading->isQueryWithFinal())
return false;
if (reading->isQueryWithSampling())
return false;
if (reading->isParallelReadingEnabled())
return false;
// Currently projection don't support deduplication when moving parts between shards.
if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
return false;
return true;
}
/// Required analysis info from aggregate projection.
struct AggregateProjectionInfo
{
@ -255,17 +380,120 @@ bool areAggregatesMatch(
return true;
}
struct AggregateQueryDAG
{
ActionsDAGPtr dag;
const ActionsDAG::Node * filter_node = nullptr;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
if (!filter_nodes.empty())
{
filter_node = filter_nodes.front();
if (filter_nodes.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
dag->getOutputs().push_back(filter_node);
}
return true;
}
};
struct NormalQueryDAG
{
ActionsDAGPtr dag;
bool need_remove_column = false;
const ActionsDAG::Node * filter_node = nullptr;
bool build(QueryPlan::Node & node)
{
QueryDAG query;
if (!query.build(node))
return false;
dag = std::move(query.dag);
auto filter_nodes = std::move(query.filter_nodes);
need_remove_column = query.remove_last_filter_node;
if (!filter_nodes.empty())
{
auto & outputs = dag->getOutputs();
filter_node = filter_nodes.back();
if (filter_nodes.size() > 1)
{
/// Add a conjunction of all the filters.
if (need_remove_column)
{
/// Last filter column is not needed; remove it right here
size_t pos = 0;
while (pos < outputs.size() && outputs[pos] != filter_node)
++pos;
if (pos < outputs.size())
outputs.erase(outputs.begin() + pos);
}
else
{
/// Last filter is needed; we must replace it to constant 1,
/// As well as FilterStep does to make a compatible header.
for (auto & output : outputs)
{
if (output == filter_node)
{
ColumnWithTypeAndName col;
col.name = filter_node->result_name;
col.type = filter_node->result_type;
col.column = col.type->createColumnConst(1, 1);
output = &dag->addColumn(std::move(col));
}
}
}
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
outputs.insert(outputs.begin(), filter_node);
need_remove_column = true;
}
}
if (dag)
{
dag->removeUnusedActions();
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Header {}, Query DAG: {}", header.dumpStructure(), dag->dumpDAG());
}
return true;
}
};
ActionsDAGPtr analyzeAggregateProjection(
const AggregateProjectionInfo & info,
const ActionsDAG & query_dag,
const ActionsDAG::Node * filter_node,
const AggregateQueryDAG & query,
const Names & keys,
const AggregateDescriptions & aggregates)
{
auto query_index = buildDAGIndex(query_dag);
auto query_index = buildDAGIndex(*query.dag);
auto proj_index = buildDAGIndex(*info.before_aggregation);
MatchedTrees::Matches matches = matchTrees(*info.before_aggregation, query_dag);
MatchedTrees::Matches matches = matchTrees(*info.before_aggregation, *query.dag);
// for (const auto & [node, match] : matches)
// {
@ -298,8 +526,8 @@ ActionsDAGPtr analyzeAggregateProjection(
/// We need to add filter column to keys set.
/// It should be computable from projection keys.
/// It will be removed in FilterStep.
if (filter_node)
query_key_nodes.push_back(filter_node);
if (query.filter_node)
query_key_nodes.push_back(query.filter_node);
for (const auto & key : keys)
{
@ -362,7 +590,7 @@ ActionsDAGPtr analyzeAggregateProjection(
/// Not a match and there is no matched child.
if (frame.node->type == ActionsDAG::ActionType::INPUT)
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Cannot find match for {}", frame.node->result_name);
return {};
}
@ -374,7 +602,7 @@ ActionsDAGPtr analyzeAggregateProjection(
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Folding actions by projection");
auto proj_dag = query_dag.foldActionsByProjection(new_inputs, query_key_nodes);
auto proj_dag = query.dag->foldActionsByProjection(new_inputs, query_key_nodes);
/// Just add all the aggregates to dag inputs.
auto & proj_dag_outputs = proj_dag->getOutputs();
@ -384,125 +612,6 @@ ActionsDAGPtr analyzeAggregateProjection(
return proj_dag;
}
static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
return &node;
if (node.children.size() != 1)
return nullptr;
if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step))
return findReadingStep(*node.children.front());
return nullptr;
}
static void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expression)
{
if (dag)
dag->mergeInplace(std::move(*expression->clone()));
else
dag = expression->clone();
}
/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain.
/// Additionally, for all the Filter steps, we collect filter conditions into filter_nodes.
/// Flag need_remove_column is set in case if the last step is a Filter step and it should remove filter column.
static bool buildQueryDAG(
QueryPlan::Node & node,
ActionsDAGPtr & dag,
ActionsDAG::NodeRawConstPtrs & filter_nodes,
bool & need_remove_column)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
if (const auto * prewhere_info = reading->getPrewhereInfo())
{
if (prewhere_info->row_level_filter)
{
need_remove_column = false;
appendExpression(dag, prewhere_info->row_level_filter);
if (const auto * filter_node = dag->tryFindInOutputs(prewhere_info->row_level_column_name))
filter_nodes.push_back(filter_node);
else
return false;
}
if (prewhere_info->prewhere_actions)
{
need_remove_column = prewhere_info->remove_prewhere_column;
appendExpression(dag, prewhere_info->prewhere_actions);
if (const auto * filter_node = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
filter_nodes.push_back(filter_node);
else
return false;
}
}
return true;
}
if (node.children.size() != 1)
return false;
if (!buildQueryDAG(*node.children.front(), dag, filter_nodes, need_remove_column))
return false;
if (auto * expression = typeid_cast<ExpressionStep *>(step))
{
const auto & actions = expression->getExpression();
if (actions->hasArrayJoin())
return false;
appendExpression(dag, actions);
need_remove_column = false;
return true;
}
if (auto * filter = typeid_cast<FilterStep *>(step))
{
const auto & actions = filter->getExpression();
if (actions->hasArrayJoin())
return false;
appendExpression(dag, actions);
need_remove_column = filter->removesFilterColumn();
const auto * filter_expression = dag->tryFindInOutputs(filter->getFilterColumnName());
if (!filter_expression)
return false;
filter_nodes.push_back(filter_expression);
return true;
}
return false;
}
bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
return false;
if (reading->isQueryWithFinal())
return false;
if (reading->isQueryWithSampling())
return false;
if (reading->isParallelReadingEnabled())
return false;
// Currently projection don't support deduplication when moving parts between shards.
if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
return false;
return true;
}
struct MinMaxProjectionCandidate
{
AggregateProjectionCandidate candidate;
@ -514,6 +623,8 @@ struct AggregateProjectionCandidates
{
std::vector<AggregateProjectionCandidate> real;
std::optional<MinMaxProjectionCandidate> minmax_projection;
/// This flag means that DAG for projection candidate should be used in FilterStep.
bool has_filter = false;
};
@ -548,30 +659,13 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Has agg projection");
ActionsDAGPtr dag;
bool need_remove_column = false; // not used here
ActionsDAG::NodeRawConstPtrs filter_nodes;
if (!buildQueryDAG(*node.children.front(), dag, filter_nodes, need_remove_column))
AggregateQueryDAG dag;
if (!dag.build(*node.children.front()))
return candidates;
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG());
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag.dag->dumpDAG());
const ActionsDAG::Node * filter_node = nullptr;
if (!filter_nodes.empty())
{
filter_node = filter_nodes.front();
if (filter_nodes.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
dag->getOutputs().push_back(filter_node);
}
candidates.has_filter = filter_node;
candidates.has_filter = dag.filter_node;
if (can_use_minmax_projection)
{
@ -579,7 +673,7 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name);
auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates))
if (auto proj_dag = analyzeAggregateProjection(info, dag, keys, aggregates))
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
@ -588,7 +682,7 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock(
metadata,
candidate.dag->getRequiredColumnsNames(),
filter_node != nullptr,
dag.filter_node != nullptr,
query_info,
parts,
minmax_projection_normal_parts,
@ -615,7 +709,7 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name);
auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates))
if (auto proj_dag = analyzeAggregateProjection(info, dag, keys, aggregates))
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
@ -823,6 +917,40 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
return true;
}
ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header)
{
/// Materialize constants in case we don't have it in output header.
/// This may happen e.g. if we have PREWHERE.
size_t num_columns = main_header.columns();
/// This is a error; will have block structure mismatch later.
if (proj_header.columns() != num_columns)
return nullptr;
std::vector<size_t> const_positions;
for (size_t i = 0; i < num_columns; ++i)
{
auto col_proj = proj_header.getByPosition(i).column;
auto col_main = main_header.getByPosition(i).column;
bool is_proj_const = col_proj && isColumnConst(*col_proj);
bool is_main_proj = col_main && isColumnConst(*col_main);
if (is_proj_const && !is_main_proj)
const_positions.push_back(i);
}
if (const_positions.empty())
return nullptr;
ActionsDAGPtr dag = std::make_unique<ActionsDAG>(proj_header.getColumnsWithTypeAndName());
for (auto pos : const_positions)
{
auto & output = dag->getOutputs()[pos];
output = &dag->materializeNode(*output);
}
return dag;
}
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
@ -863,66 +991,15 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (normal_projections.empty())
return false;
ActionsDAGPtr dag;
ActionsDAG::NodeRawConstPtrs filter_nodes;
bool need_remove_column = false;
if (!buildQueryDAG(*iter->node->children.front(), dag, filter_nodes, need_remove_column))
return false;
const ActionsDAG::Node * filter_node = nullptr;
if (!filter_nodes.empty())
NormalQueryDAG query;
{
auto & outputs = dag->getOutputs();
filter_node = filter_nodes.back();
if (filter_nodes.size() > 1)
{
if (need_remove_column)
{
size_t pos = 0;
while (pos < outputs.size() && outputs[pos] != filter_node)
++pos;
if (pos < outputs.size())
outputs.erase(outputs.begin() + pos);
}
else
{
for (auto & output : outputs)
{
if (output == filter_node)
{
ColumnWithTypeAndName col;
col.name = filter_node->result_name;
col.type = filter_node->result_type;
col.column = col.type->createColumnConst(1, 1);
output = &dag->addColumn(std::move(col));
}
}
}
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
outputs.insert(outputs.begin(), filter_node);
need_remove_column = true;
}
// else if (!need_remove_column)
// outputs.insert(outputs.begin(), filter_node);
}
if (dag)
{
dag->removeUnusedActions();
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Query DAG: {}", dag->dumpDAG());
if (!query.build(*iter->node->children.front()))
return false;
}
std::list<NormalProjectionCandidate> candidates;
NormalProjectionCandidate * best_candidate = nullptr;
//const Block & header = frame.node->step->getOutputStream().header;
const Names & required_columns = reading->getRealColumnNames();
const auto & parts = reading->getParts();
const auto & query_info = reading->getQueryInfo();
@ -972,8 +1049,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
continue;
ActionDAGNodes added_filter_nodes;
if (filter_node)
added_filter_nodes.nodes.push_back(filter_node);
if (query.filter_node)
added_filter_nodes.nodes.push_back(query.filter_node);
auto projection_result_ptr = reader.estimateNumMarksToRead(
std::move(projection_parts),
@ -1059,25 +1136,22 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
auto * next_node = &projection_reading_node;
if (dag)
if (query.dag)
{
auto & expr_or_filter_node = nodes.emplace_back();
if (filter_node)
if (query.filter_node)
{
//std::cerr << "======== " << projection_reading_node.step->getOutputStream().header.dumpStructure();
expr_or_filter_node.step = std::make_unique<FilterStep>(
projection_reading_node.step->getOutputStream(),
dag,
filter_node->result_name,
need_remove_column);
//std::cerr << "======2= " << expr_or_filter_node.step->getOutputStream().header.dumpStructure();
query.dag,
query.filter_node->result_name,
query.need_remove_column);
}
else
expr_or_filter_node.step = std::make_unique<ExpressionStep>(
projection_reading_node.step->getOutputStream(),
dag);
query.dag);
expr_or_filter_node.children.push_back(&projection_reading_node);
next_node = &expr_or_filter_node;
@ -1087,31 +1161,26 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
/// All parts are taken from projection
iter->node->children.front() = next_node;
//optimizeAggregationInOrder(node, nodes);
}
else
{
const auto & main_stream = iter->node->children.front()->step->getOutputStream();
const auto * proj_stream = &next_node->step->getOutputStream();
if (!blocksHaveEqualStructure(proj_stream->header, main_stream.header))
if (auto materializing = makeMaterializingDAG(proj_stream->header, main_stream.header))
{
// auto convert_actions_dag = ActionsDAG::makeConvertingActions(
// proj_stream->header.getColumnsWithTypeAndName(),
// main_stream.header.getColumnsWithTypeAndName(),
// ActionsDAG::MatchColumnsMode::Name,
// true);
//std::cerr << "======3= " << next_node->step->getOutputStream().header.dumpStructure();
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
proj_stream->header.getColumnsWithTypeAndName(),
main_stream.header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting = std::make_unique<ExpressionStep>(*proj_stream, convert_actions_dag);
auto converting = std::make_unique<ExpressionStep>(*proj_stream, materializing);
proj_stream = &converting->getOutputStream();
auto & expr_node = nodes.emplace_back();
expr_node.step = std::move(converting);
expr_node.children.push_back(next_node);
next_node = &expr_node;
//std::cerr << "======4= " << next_node->step->getOutputStream().header.dumpStructure();
}
auto & union_node = nodes.emplace_back();