Updating PrewhereInfo

This commit is contained in:
Nikolai Kochetov 2024-07-25 16:33:12 +00:00
parent fb271436a1
commit 869f6a6f10
16 changed files with 52 additions and 70 deletions

View File

@ -2230,12 +2230,11 @@ void ExpressionAnalysisResult::checkActions() const
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (hasPrewhere())
{
auto check_actions = [](const std::optional<ActionsDAG> & actions)
auto check_actions = [](ActionsDAG & actions)
{
if (actions)
for (const auto & node : actions->getNodes())
if (node.type == ActionsDAG::ActionType::ARRAY_JOIN)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "PREWHERE cannot contain ARRAY JOIN action");
for (const auto & node : actions.getNodes())
if (node.type == ActionsDAG::ActionType::ARRAY_JOIN)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "PREWHERE cannot contain ARRAY JOIN action");
};
check_actions(prewhere_info->prewhere_actions);

View File

@ -937,7 +937,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
{
{
const auto & node
= query_info_copy.prewhere_info->prewhere_actions->findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
= query_info_copy.prewhere_info->prewhere_actions.findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
added_filter_nodes.nodes.push_back(&node);
}
@ -1058,7 +1058,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (analysis_result.prewhere_info)
{
header = analysis_result.prewhere_info->prewhere_actions->updateHeader(header);
header = analysis_result.prewhere_info->prewhere_actions.updateHeader(header);
if (analysis_result.prewhere_info->remove_prewhere_column)
header.erase(analysis_result.prewhere_info->prewhere_column_name);
}
@ -1521,7 +1521,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->prewhere_actions->clone(),
expressions.prewhere_info->prewhere_actions.clone(),
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
@ -2066,7 +2066,7 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
});
}
auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions->clone());
auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions.clone());
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
@ -2157,7 +2157,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
auto prewhere_required_columns = prewhere_info->prewhere_actions.getRequiredColumns().getNames();
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter)
@ -2229,7 +2229,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
if (prewhere_info)
{
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions.getResultColumns();
/// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
/// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
@ -2268,7 +2268,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
{
/// Don't remove columns which are needed to be aliased.
for (const auto & name : required_columns)
prewhere_info->prewhere_actions->tryRestoreColumn(name);
prewhere_info->prewhere_actions.tryRestoreColumn(name);
/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
@ -2326,7 +2326,7 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 max_paralle
if (analysis_result.hasPrewhere())
{
auto & prewhere_info = analysis_result.prewhere_info;
filter_nodes.push_back(&prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name));
filter_nodes.push_back(&prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name));
if (prewhere_info->row_level_filter)
filter_nodes.push_back(&prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name));

View File

@ -437,7 +437,7 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
std::unordered_set<const ActionsDAG::Node *> required_output_nodes;
for (const auto * input : prewhere_actions->getInputs())
for (const auto * input : prewhere_actions.getInputs())
{
if (required_columns.contains(input->result_name))
required_output_nodes.insert(input);
@ -446,7 +446,7 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
if (required_output_nodes.empty())
return;
auto & prewhere_outputs = prewhere_actions->getOutputs();
auto & prewhere_outputs = prewhere_actions.getOutputs();
for (const auto & output : prewhere_outputs)
{
auto required_output_node_it = required_output_nodes.find(output);
@ -801,10 +801,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (storage->canMoveConditionsToPrewhere() && optimize_move_to_prewhere && (!supported_prewhere_columns || supported_prewhere_columns->contains(filter_info.column_name)))
{
if (!prewhere_info)
prewhere_info = std::make_shared<PrewhereInfo>();
if (!prewhere_info->prewhere_actions)
{
prewhere_info = std::make_shared<PrewhereInfo>();
prewhere_info->prewhere_actions = std::move(filter_info.actions);
prewhere_info->prewhere_column_name = filter_info.column_name;
prewhere_info->remove_prewhere_column = filter_info.do_remove_column;

View File

@ -56,7 +56,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
return;
const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo();
if (storage_prewhere_info && storage_prewhere_info->prewhere_actions)
if (storage_prewhere_info)
return;
/// TODO: We can also check for UnionStep, such as StorageBuffer and local distributed plans.
@ -165,16 +165,16 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
{
prewhere_info->prewhere_column_name = conditions.front()->result_name;
if (prewhere_info->remove_prewhere_column)
prewhere_info->prewhere_actions->getOutputs().push_back(conditions.front());
prewhere_info->prewhere_actions.getOutputs().push_back(conditions.front());
}
else
{
prewhere_info->remove_prewhere_column = true;
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto * node = &prewhere_info->prewhere_actions->addFunction(func_builder_and, std::move(conditions), {});
const auto * node = &prewhere_info->prewhere_actions.addFunction(func_builder_and, std::move(conditions), {});
prewhere_info->prewhere_column_name = node->result_name;
prewhere_info->prewhere_actions->getOutputs().push_back(node);
prewhere_info->prewhere_actions.getOutputs().push_back(node);
}
source_step_with_filter->updatePrewhereInfo(prewhere_info);

View File

@ -18,7 +18,7 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
const auto & storage_prewhere_info = source_step_with_filter->getPrewhereInfo();
if (storage_prewhere_info)
{
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions->clone(), storage_prewhere_info->prewhere_column_name);
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions.clone(), storage_prewhere_info->prewhere_column_name);
if (storage_prewhere_info->row_level_filter)
source_step_with_filter->addFilter(storage_prewhere_info->row_level_filter->clone(), storage_prewhere_info->row_level_column_name);
}

View File

@ -191,13 +191,11 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag, Fi
/// Should ignore limit if there is filtering.
limit = 0;
if (prewhere_info->prewhere_actions)
{
//std::cerr << "====== Adding prewhere " << std::endl;
appendExpression(dag, *prewhere_info->prewhere_actions);
if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
}
//std::cerr << "====== Adding prewhere " << std::endl;
appendExpression(dag, prewhere_info->prewhere_actions);
if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name))
appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns);
}
return;
}

View File

@ -128,15 +128,12 @@ bool QueryDAG::buildImpl(QueryPlan::Node & node, ActionsDAG::NodeRawConstPtrs &
return false;
}
if (prewhere_info->prewhere_actions)
{
appendExpression(*prewhere_info->prewhere_actions);
if (const auto * filter_expression
= findInOutputs(*dag, prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column))
filter_nodes.push_back(filter_expression);
else
return false;
}
appendExpression(prewhere_info->prewhere_actions);
if (const auto * filter_expression
= findInOutputs(*dag, prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column))
filter_nodes.push_back(filter_expression);
else
return false;
}
return true;
}

View File

@ -109,8 +109,7 @@ bool restorePrewhereInputs(PrewhereInfo & info, const NameSet & inputs)
if (info.row_level_filter)
added = added || restoreDAGInputs(*info.row_level_filter, inputs);
if (info.prewhere_actions)
added = added || restoreDAGInputs(*info.prewhere_actions, inputs);
added = added || restoreDAGInputs(info.prewhere_actions, inputs);
return added;
}
@ -175,9 +174,8 @@ static void updateSortDescriptionForOutputStream(
Block original_header = output_stream.header.cloneEmpty();
if (prewhere_info)
{
if (prewhere_info->prewhere_actions)
{
FindOriginalNodeForOutputName original_column_finder(*prewhere_info->prewhere_actions);
FindOriginalNodeForOutputName original_column_finder(prewhere_info->prewhere_actions);
for (auto & column : original_header)
{
const auto * original_node = original_column_finder.find(column.name);
@ -2131,7 +2129,6 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
prefix.push_back(format_settings.indent_char);
prefix.push_back(format_settings.indent_char);
if (prewhere_info->prewhere_actions)
{
format_settings.out << prefix << "Prewhere filter" << '\n';
format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
@ -2139,7 +2136,7 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
format_settings.out << " (removed)";
format_settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->clone());
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions.clone());
expression->describeActions(format_settings.out, prefix);
}
@ -2169,12 +2166,11 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_info_map->add("Need filter", prewhere_info->need_filter);
if (prewhere_info->prewhere_actions)
{
std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->clone());
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions.clone());
prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));

View File

@ -34,9 +34,8 @@ Block SourceStepWithFilter::applyPrewhereActions(Block block, const PrewhereInfo
block.erase(prewhere_info->row_level_column_name);
}
if (prewhere_info->prewhere_actions)
{
block = prewhere_info->prewhere_actions->updateHeader(block);
block = prewhere_info->prewhere_actions.updateHeader(block);
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())
@ -102,7 +101,6 @@ void SourceStepWithFilter::describeActions(FormatSettings & format_settings) con
prefix.push_back(format_settings.indent_char);
prefix.push_back(format_settings.indent_char);
if (prewhere_info->prewhere_actions)
{
format_settings.out << prefix << "Prewhere filter" << '\n';
format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
@ -110,7 +108,7 @@ void SourceStepWithFilter::describeActions(FormatSettings & format_settings) con
format_settings.out << " (removed)";
format_settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->clone());
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions.clone());
expression->describeActions(format_settings.out, prefix);
}
@ -132,12 +130,11 @@ void SourceStepWithFilter::describeActions(JSONBuilder::JSONMap & map) const
std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_info_map->add("Need filter", prewhere_info->need_filter);
if (prewhere_info->prewhere_actions)
{
std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->clone());
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions.clone());
prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));

View File

@ -325,9 +325,8 @@ std::string PrewhereInfo::dump() const
ss << "row_level_filter " << row_level_filter->dumpDAG() << "\n";
}
if (prewhere_actions)
{
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
ss << "prewhere_actions " << prewhere_actions.dumpDAG() << "\n";
}
ss << "remove_prewhere_column " << remove_prewhere_column

View File

@ -329,7 +329,7 @@ void MergeTreePrefetchedReadPool::fillPerPartStatistics()
part_stat.sum_marks += range.end - range.begin;
const auto & columns = settings.merge_tree_determine_task_size_by_prewhere_columns && prewhere_info
? prewhere_info->prewhere_actions->getRequiredColumnsNames()
? prewhere_info->prewhere_actions.getRequiredColumnsNames()
: column_names;
part_stat.approx_size_of_mark = getApproximateSizeOfGranule(*read_info.data_part, columns);

View File

@ -65,7 +65,7 @@ static size_t calculateMinMarksPerTask(
/// Which means in turn that for most of the rows we will read only the columns from prewhere clause.
/// So it makes sense to use only them for the estimation.
const auto & columns = settings.merge_tree_determine_task_size_by_prewhere_columns && prewhere_info
? prewhere_info->prewhere_actions->getRequiredColumnsNames()
? prewhere_info->prewhere_actions.getRequiredColumnsNames()
: columns_to_read;
const size_t part_compressed_bytes = getApproxSizeOfPart(*part.data_part, columns);

View File

@ -59,7 +59,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
if (prewhere_info)
LOG_TEST(log, "Original PREWHERE DAG:\n{}\nPREWHERE actions:\n{}",
(prewhere_info->prewhere_actions ? prewhere_info->prewhere_actions->dumpDAG(): std::string("<nullptr>")),
prewhere_info->prewhere_actions.dumpDAG(),
(!prewhere_actions.steps.empty() ? prewhere_actions.dump() : std::string("<nullptr>")));
}
@ -96,7 +96,7 @@ PrewhereExprInfo MergeTreeSelectProcessor::getPrewhereActions(PrewhereInfoPtr pr
PrewhereExprStep prewhere_step
{
.type = PrewhereExprStep::Filter,
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->clone(), actions_settings),
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions.clone(), actions_settings),
.filter_column_name = prewhere_info->prewhere_column_name,
.remove_filter_column = prewhere_info->remove_prewhere_column,
.need_filter = prewhere_info->need_filter,

View File

@ -216,11 +216,11 @@ const ActionsDAG::Node & addAndTrue(
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, PrewhereExprInfo & prewhere)
{
if (!prewhere_info || !prewhere_info->prewhere_actions)
if (!prewhere_info)
return true;
/// 1. List all condition nodes that are combined with AND into PREWHERE condition
const auto & condition_root = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
const auto & condition_root = prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name);
const bool is_conjunction = (condition_root.type == ActionsDAG::ActionType::FUNCTION && condition_root.function_base->getName() == "and");
if (!is_conjunction)
return false;
@ -306,7 +306,7 @@ bool tryBuildPrewhereSteps(PrewhereInfoPtr prewhere_info, const ExpressionAction
}
/// 6. Find all outputs of the original DAG
auto original_outputs = prewhere_info->prewhere_actions->getOutputs();
auto original_outputs = prewhere_info->prewhere_actions.getOutputs();
/// 7. Find all outputs that were computed in the already built DAGs, mark these nodes as outputs in the steps where they were computed
/// 8. Add computation of the remaining outputs to the last step with the procedure similar to 4
NameSet all_output_names;

View File

@ -45,7 +45,7 @@ struct PrewhereInfo
/// This actions are separate because prewhere condition should not be executed over filtered rows.
std::optional<ActionsDAG> row_level_filter;
/// Actions which are executed on block in order to get filter column for prewhere step.
std::optional<ActionsDAG> prewhere_actions;
ActionsDAG prewhere_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
@ -53,7 +53,7 @@ struct PrewhereInfo
bool generated_by_optimizer = false;
PrewhereInfo() = default;
explicit PrewhereInfo(std::optional<ActionsDAG> prewhere_actions_, String prewhere_column_name_)
explicit PrewhereInfo(ActionsDAG prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
std::string dump() const;
@ -65,8 +65,7 @@ struct PrewhereInfo
if (row_level_filter)
prewhere_info->row_level_filter = row_level_filter->clone();
if (prewhere_actions)
prewhere_info->prewhere_actions = prewhere_actions->clone();
prewhere_info->prewhere_actions = prewhere_actions.clone();
prewhere_info->row_level_column_name = row_level_column_name;
prewhere_info->prewhere_column_name = prewhere_column_name;

View File

@ -319,13 +319,12 @@ void StorageBuffer::read(
src_table_query_info.prewhere_info->row_level_filter->removeUnusedActions();
}
if (src_table_query_info.prewhere_info->prewhere_actions)
{
src_table_query_info.prewhere_info->prewhere_actions = ActionsDAG::merge(
actions_dag.clone(),
std::move(*src_table_query_info.prewhere_info->prewhere_actions));
std::move(src_table_query_info.prewhere_info->prewhere_actions));
src_table_query_info.prewhere_info->prewhere_actions->removeUnusedActions();
src_table_query_info.prewhere_info->prewhere_actions.removeUnusedActions();
}
}
@ -440,7 +439,7 @@ void StorageBuffer::read(
});
}
auto actions = std::make_shared<ExpressionActions>(query_info.prewhere_info->prewhere_actions->clone(), actions_settings);
auto actions = std::make_shared<ExpressionActions>(query_info.prewhere_info->prewhere_actions.clone(), actions_settings);
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(