Cleaner FilterDAGInfo.

This commit is contained in:
Nikolai Kochetov 2024-07-12 17:19:30 +00:00
parent 1237f93182
commit a6e737ef2a
11 changed files with 49 additions and 56 deletions

View File

@ -1249,31 +1249,30 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
ActionsDAGPtr ActionsDAG::clone(const ActionsDAG * from)
{
std::unordered_map<const Node *, Node *> old_to_new_nodes;
return ActionsDAG::clone(from, old_to_new_nodes);
if (from == nullptr)
return nullptr;
return std::make_unique<ActionsDAG>(ActionsDAG::clone(*from, old_to_new_nodes));
}
ActionsDAGPtr ActionsDAG::clone(const ActionsDAG * from, std::unordered_map<const Node *, Node *> & old_to_new_nodes)
ActionsDAG ActionsDAG::clone(const ActionsDAG & from, std::unordered_map<const Node *, Node *> & old_to_new_nodes)
{
if (!from)
return nullptr;
ActionsDAG actions;
auto actions = std::make_unique<ActionsDAG>();
for (const auto & node : from->nodes)
for (const auto & node : from.nodes)
{
auto & copy_node = actions->nodes.emplace_back(node);
auto & copy_node = actions.nodes.emplace_back(node);
old_to_new_nodes[&node] = &copy_node;
}
for (auto & node : actions->nodes)
for (auto & node : actions.nodes)
for (auto & child : node.children)
child = old_to_new_nodes[child];
for (const auto & output_node : from->outputs)
actions->outputs.push_back(old_to_new_nodes[output_node]);
for (const auto & output_node : from.outputs)
actions.outputs.push_back(old_to_new_nodes[output_node]);
for (const auto & input_node : from->inputs)
actions->inputs.push_back(old_to_new_nodes[input_node]);
for (const auto & input_node : from.inputs)
actions.inputs.push_back(old_to_new_nodes[input_node]);
return actions;
}

View File

@ -263,7 +263,7 @@ public:
static ActionsDAGPtr clone(const ActionsDAGPtr & from) { return clone(from.get()); }
static ActionsDAGPtr clone(const ActionsDAG * from);
static ActionsDAGPtr clone(const ActionsDAG * from, std::unordered_map<const Node *, Node *> & old_to_new_nodes);
static ActionsDAG clone(const ActionsDAG & from, std::unordered_map<const Node *, Node *> & old_to_new_nodes);
static ActionsDAGPtr cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases);

View File

@ -75,7 +75,7 @@ ExpressionActionsPtr ExpressionActions::clone() const
auto copy = std::make_shared<ExpressionActions>(ExpressionActions());
std::unordered_map<const Node *, Node *> copy_map;
copy->actions_dag = std::move(*ActionsDAG::clone(&actions_dag, copy_map));
copy->actions_dag = ActionsDAG::clone(actions_dag, copy_map);
copy->actions = actions;
for (auto & action : copy->actions)
action.node = copy_map[action.node];

View File

@ -1922,7 +1922,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (storage && additional_filter)
{
Names columns_for_additional_filter = additional_filter->actions->getRequiredColumnsNames();
Names columns_for_additional_filter = additional_filter->actions.getRequiredColumnsNames();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_additional_filter.begin(), columns_for_additional_filter.end());
}

View File

@ -178,12 +178,12 @@ FilterDAGInfoPtr generateFilterActions(
filter_info->actions = std::move(analyzer.simpleSelectActions()->dag);
filter_info->column_name = expr_list->children.at(0)->getColumnName();
filter_info->actions->removeUnusedActions(NameSet{filter_info->column_name});
filter_info->actions.removeUnusedActions(NameSet{filter_info->column_name});
for (const auto * node : filter_info->actions->getInputs())
filter_info->actions->getOutputs().push_back(node);
for (const auto * node : filter_info->actions.getInputs())
filter_info->actions.getOutputs().push_back(node);
auto required_columns_from_filter = filter_info->actions->getRequiredColumns();
auto required_columns_from_filter = filter_info->actions.getRequiredColumns();
for (const auto & column : required_columns_from_filter)
{
@ -1486,7 +1486,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
std::move(*ActionsDAG::clone(&*expressions.filter_info->actions)),
std::move(*ActionsDAG::clone(&expressions.filter_info->actions)),
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
@ -1612,7 +1612,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
std::move(*ActionsDAG::clone(&*expressions.filter_info->actions)),
std::move(*ActionsDAG::clone(&expressions.filter_info->actions)),
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
@ -1620,11 +1620,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
query_plan.addStep(std::move(row_level_security_step));
}
const auto add_filter_step = [&](const auto & new_filter_info, const std::string & description)
const auto add_filter_step = [&](auto & new_filter_info, const std::string & description)
{
auto filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
std::move(*ActionsDAG::clone(&*new_filter_info->actions)),
std::move(new_filter_info->actions),
new_filter_info->column_name,
new_filter_info->do_remove_column);
@ -2107,7 +2107,7 @@ void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysis
else
{
/// Add row level security actions to prewhere.
analysis.prewhere_info->row_level_filter = std::move(*analysis.filter_info->actions);
analysis.prewhere_info->row_level_filter = std::move(analysis.filter_info->actions);
analysis.prewhere_info->row_level_column_name = std::move(analysis.filter_info->column_name);
analysis.filter_info = nullptr;
}

View File

@ -1128,11 +1128,11 @@ void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
if (!filter_info.actions || !query_plan.isInitialized())
if (!query_plan.isInitialized())
return;
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
std::move(*filter_info.actions),
std::move(filter_info.actions),
filter_info.column_name,
filter_info.do_remove_column);
filter_step->setStepDescription("additional result filter");

View File

@ -458,7 +458,7 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
prewhere_outputs.insert(prewhere_outputs.end(), required_output_nodes.begin(), required_output_nodes.end());
}
FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
std::optional<FilterDAGInfo> buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context,
std::set<std::string> & used_row_policies)
@ -479,7 +479,7 @@ FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info.table_expression, planner_context);
}
FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
std::optional<FilterDAGInfo> buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
@ -513,7 +513,7 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
}
/// Apply filters from additional_table_filters setting
FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
std::optional<FilterDAGInfo> buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
const String & table_expression_alias,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
@ -789,9 +789,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
std::vector<std::pair<FilterDAGInfo, std::string>> where_filters;
const auto add_filter = [&](FilterDAGInfo & filter_info, std::string description)
{
if (!filter_info.actions)
return;
bool is_final = table_expression_query_info.table_expression_modifiers
&& table_expression_query_info.table_expression_modifiers->hasFinal();
bool optimize_move_to_prewhere
@ -805,14 +802,14 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (!prewhere_info->prewhere_actions)
{
prewhere_info->prewhere_actions = std::move(*filter_info.actions);
prewhere_info->prewhere_actions = std::move(filter_info.actions);
prewhere_info->prewhere_column_name = filter_info.column_name;
prewhere_info->remove_prewhere_column = filter_info.do_remove_column;
prewhere_info->need_filter = true;
}
else if (!prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter = std::move(*filter_info.actions);
prewhere_info->row_level_filter = std::move(filter_info.actions);
prewhere_info->row_level_column_name = filter_info.column_name;
prewhere_info->need_filter = true;
}
@ -830,17 +827,18 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
auto row_policy_filter_info
= buildRowPolicyFilterIfNeeded(storage, table_expression_query_info, planner_context, used_row_policies);
if (row_policy_filter_info.actions)
table_expression_data.setRowLevelFilterActions(ActionsDAG::clone(&*row_policy_filter_info.actions));
add_filter(row_policy_filter_info, "Row-level security filter");
if (row_policy_filter_info)
{
table_expression_data.setRowLevelFilterActions(ActionsDAG::clone(&row_policy_filter_info->actions));
add_filter(*row_policy_filter_info, "Row-level security filter");
}
if (query_context->getParallelReplicasMode() == Context::ParallelReplicasMode::CUSTOM_KEY)
{
if (settings.parallel_replicas_count > 1)
{
auto parallel_replicas_custom_key_filter_info
= buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context);
add_filter(parallel_replicas_custom_key_filter_info, "Parallel replicas custom key filter");
if (auto parallel_replicas_custom_key_filter_info= buildCustomKeyFilterIfNeeded(storage, table_expression_query_info, planner_context))
add_filter(*parallel_replicas_custom_key_filter_info, "Parallel replicas custom key filter");
}
else if (auto * distributed = typeid_cast<StorageDistributed *>(storage.get());
distributed && query_context->canUseParallelReplicasCustomKey(*distributed->getCluster()))
@ -850,9 +848,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
const auto & table_expression_alias = table_expression->getOriginalAlias();
auto additional_filters_info
= buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context);
add_filter(additional_filters_info, "additional filter");
if (auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context))
add_filter(*additional_filters_info, "additional filter");
from_stage = storage->getQueryProcessingStage(
query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);
@ -967,11 +964,10 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
for (auto && [filter_info, description] : where_filters)
{
if (query_plan.isInitialized() &&
from_stage == QueryProcessingStage::FetchColumns &&
filter_info.actions)
from_stage == QueryProcessingStage::FetchColumns)
{
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
std::move(*filter_info.actions),
std::move(filter_info.actions),
filter_info.column_name,
filter_info.do_remove_column);
filter_step->setStepDescription(description);

View File

@ -340,10 +340,8 @@ std::string FilterDAGInfo::dump() const
WriteBufferFromOwnString ss;
ss << "FilterDAGInfo for column '" << column_name <<"', do_remove_column "
<< do_remove_column << "\n";
if (actions)
{
ss << "actions " << actions->dumpDAG() << "\n";
}
ss << "actions " << actions.dumpDAG() << "\n";
return ss.str();
}

View File

@ -7058,7 +7058,7 @@ ActionDAGNodes MergeTreeData::getFiltersForPrimaryKeyAnalysis(const InterpreterS
ActionDAGNodes filter_nodes;
if (auto additional_filter_info = select.getAdditionalQueryInfo())
filter_nodes.nodes.push_back(&additional_filter_info->actions->findInOutputs(additional_filter_info->column_name));
filter_nodes.nodes.push_back(&additional_filter_info->actions.findInOutputs(additional_filter_info->column_name));
if (before_where)
filter_nodes.nodes.push_back(&before_where->dag.findInOutputs(where_column_name));

View File

@ -93,7 +93,7 @@ struct FilterInfo
/// Same as FilterInfo, but with ActionsDAG.
struct FilterDAGInfo
{
std::optional<ActionsDAG> actions;
ActionsDAG actions;
String column_name;
bool do_remove_column = false;

View File

@ -407,7 +407,7 @@ void ReadFromMerge::addFilter(FilterDAGInfo filter)
{
output_stream->header = FilterTransform::transformHeader(
output_stream->header,
filter.actions ? &*filter.actions : nullptr,
&filter.actions,
filter.column_name,
filter.do_remove_column);
pushed_down_filters.push_back(std::move(filter));
@ -662,7 +662,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
{
auto filter_step = std::make_unique<FilterStep>(
child.plan.getCurrentDataStream(),
std::move(*ActionsDAG::clone(&*filter_info.actions)),
std::move(*ActionsDAG::clone(&filter_info.actions)),
filter_info.column_name,
filter_info.do_remove_column);
@ -1565,7 +1565,7 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
void ReadFromMerge::applyFilters(ActionDAGNodes added_filter_nodes)
{
for (const auto & filter_info : pushed_down_filters)
added_filter_nodes.nodes.push_back(&filter_info.actions->findInOutputs(filter_info.column_name));
added_filter_nodes.nodes.push_back(&filter_info.actions.findInOutputs(filter_info.column_name));
SourceStepWithFilter::applyFilters(added_filter_nodes);