Fix some tests.

This commit is contained in:
Nikolai Kochetov 2024-01-09 16:31:16 +00:00
parent fbd71ee15e
commit d1902cdba0
6 changed files with 208 additions and 76 deletions

View File

@ -1765,13 +1765,13 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
}
/// Input from second DAG should also be in the first.
if (copy.type == ActionType::INPUT)
{
auto & input_copy = first_nodes.emplace_back(*cur.node);
assert(cur_data.to_first == nullptr);
cur_data.to_first = &input_copy;
new_inputs.push_back(cur.node);
}
// if (copy.type == ActionType::INPUT)
// {
// auto & input_copy = first_nodes.emplace_back(*cur.node);
// assert(cur_data.to_first == nullptr);
// cur_data.to_first = &input_copy;
// new_inputs.push_back(cur.node);
// }
}
else
{
@ -1790,11 +1790,12 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
/// If this node is needed in result, add it as input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = node.result_type;
input_node.result_name = node.result_name;
input_node.result_type = cur.node->result_type;
input_node.result_name = cur.node->result_name;
cur_data.to_second = &second_nodes.emplace_back(std::move(input_node));
new_inputs.push_back(cur.node);
if (cur.node->type != ActionType::INPUT)
new_inputs.push_back(cur.node);
}
}
}
@ -1810,14 +1811,29 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
for (const auto * input_node : inputs)
{
const auto & cur = data[input_node];
first_inputs.push_back(cur.to_first);
if (cur.to_first)
{
first_inputs.push_back(cur.to_first);
if (cur.to_second)
first_outputs.push_back(cur.to_first);
}
}
for (const auto * input : new_inputs)
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_outputs.push_back(cur.to_first);
if (cur.to_second)
second_inputs.push_back(cur.to_second);
if (cur.to_first)
first_outputs.push_back(cur.to_first);
}
for (const auto * input_node : inputs)
{
const auto & cur = data[input_node];
if (cur.to_second)
second_inputs.push_back(cur.to_second);
}
auto first_actions = std::make_shared<ActionsDAG>();

View File

@ -62,6 +62,20 @@ namespace ErrorCodes
namespace QueryPlanOptimizations
{
static void removeFromOutput(ActionsDAG & dag, const std::string name)
{
const auto * node = &dag.findInOutputs(name);
auto & outputs = dag.getOutputs();
for (size_t i = 0; i < outputs.size(); ++i)
{
if (node == outputs[i])
{
outputs.erase(outputs.begin() + i);
return;
}
}
}
void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
{
if (stack.size() < 3)
@ -172,7 +186,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
read_from_merge_tree->getContext(),
is_final);
if (!optimize_result.fully_moved_to_prewhere && optimize_result.prewhere_nodes.empty())
if (optimize_result.prewhere_nodes.empty())
return;
PrewhereInfoPtr prewhere_info;
@ -182,55 +196,102 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
prewhere_info = std::make_shared<PrewhereInfo>();
prewhere_info->need_filter = true;
// std::cerr << filter_step->getExpression()->dumpDAG() << std::endl;
// QueryPlan::Node * replace_old_filter_node = nullptr;
// bool remove_filter_node = false;
if (!optimize_result.fully_moved_to_prewhere)
{
auto split_result = filter_step->getExpression()->split(optimize_result.prewhere_nodes, true);
ActionsDAG::NodeRawConstPtrs conditions;
conditions.reserve(split_result.split_nodes_mapping.size());
for (const auto * condition : optimize_result.prewhere_nodes)
conditions.push_back(split_result.split_nodes_mapping.at(condition));
auto filter_expression = filter_step->getExpression();
const auto & filter_column_name = filter_step->getFilterColumnName();
prewhere_info->prewhere_actions = std::move(split_result.first);
if (optimize_result.fully_moved_to_prewhere && filter_step->removesFilterColumn())
{
removeFromOutput(*filter_expression, filter_column_name);
auto & outputs = filter_expression->getOutputs();
size_t size = outputs.size();
outputs.insert(outputs.end(), optimize_result.prewhere_nodes.begin(), optimize_result.prewhere_nodes.end());
filter_expression->removeUnusedActions(false);
outputs.resize(size);
}
// std::cerr << "!!!!!!!!!!!!!!!!\n";
// if (!optimize_result.fully_moved_to_prewhere)
// {
auto split_result = filter_step->getExpression()->split(optimize_result.prewhere_nodes, true);
// std::cerr << split_result.first->dumpDAG() << std::endl;
// std::cerr << split_result.second->dumpDAG() << std::endl;
// for (const auto * input : split_result.first->getInputs())
// std::cerr << "in 1" << input->result_name << std::endl;
// for (const auto * input : split_result.second->getInputs())
// std::cerr << "in 2" << input->result_name << std::endl;
ActionsDAG::NodeRawConstPtrs conditions;
conditions.reserve(split_result.split_nodes_mapping.size());
for (const auto * condition : optimize_result.prewhere_nodes)
{
// std::cerr << ".. " << condition->result_name << std::endl;
conditions.push_back(split_result.split_nodes_mapping.at(condition));
}
prewhere_info->prewhere_actions = std::move(split_result.first);
prewhere_info->remove_prewhere_column = optimize_result.fully_moved_to_prewhere && filter_step->removesFilterColumn();
if (conditions.size() == 1)
{
prewhere_info->prewhere_column_name = conditions.front()->result_name;
prewhere_info->prewhere_actions->getOutputs().push_back(conditions.front());
}
else
{
prewhere_info->remove_prewhere_column = true;
if (conditions.size() == 1)
{
for (const auto * output : prewhere_info->prewhere_actions->getOutputs())
{
if (output == conditions.front())
prewhere_info->remove_prewhere_column = false;
}
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto * node = &prewhere_info->prewhere_actions->addFunction(func_builder_and, std::move(conditions), {});
prewhere_info->prewhere_column_name = node->result_name;
prewhere_info->prewhere_actions->getOutputs().push_back(node);
}
prewhere_info->prewhere_column_name = conditions.front()->result_name;
}
else
{
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpStructure() << std::endl;
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpIndex() << std::endl;
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto * node = &prewhere_info->prewhere_actions->addFunction(func_builder_and, std::move(conditions), {});
prewhere_info->prewhere_column_name = node->result_name;
prewhere_info->prewhere_actions->getOutputs().push_back(node);
}
read_from_merge_tree->updatePrewhereInfo(prewhere_info);
read_from_merge_tree->updatePrewhereInfo(prewhere_info);
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpStructure() << std::endl;
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpIndex() << std::endl;
if (!optimize_result.fully_moved_to_prewhere)
{
filter_node->step = std::make_unique<FilterStep>(
read_from_merge_tree->getOutputStream(),
std::move(split_result.second),
filter_step->getFilterColumnName(),
filter_step->removesFilterColumn());
return;
}
else
{
// std::cerr << split_result.second->dumpDAG() << std::endl;
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpStructure() << std::endl;
// std::cerr << read_from_merge_tree->getOutputStream().header.dumpIndex() << std::endl;
prewhere_info->prewhere_actions = filter_step->getExpression();
prewhere_info->prewhere_column_name = filter_step->getFilterColumnName();
prewhere_info->remove_prewhere_column = filter_step->removesFilterColumn();
filter_node->step = std::make_unique<ExpressionStep>(
read_from_merge_tree->getOutputStream(),
std::move(split_result.second));
}
// return;
// }
read_from_merge_tree->updatePrewhereInfo(prewhere_info);
// std::cerr << "!!!!!!!!!!!!!!!!\n";
// prewhere_info->prewhere_actions = filter_step->getExpression();
// prewhere_info->prewhere_actions->projectInput(false);
// std::cerr << prewhere_info->prewhere_actions->dumpDAG() << std::endl;
// prewhere_info->prewhere_column_name = filter_step->getFilterColumnName();
// prewhere_info->remove_prewhere_column = filter_step->removesFilterColumn();
// read_from_merge_tree->updatePrewhereInfo(prewhere_info);
// replace_old_filter_node = frame.node;
// remove_filter_node = true;
@ -405,23 +466,23 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
// }
// }
QueryPlan::Node * filter_parent_node = (stack.rbegin() + 2)->node;
// QueryPlan::Node * filter_parent_node = (stack.rbegin() + 2)->node;
for (auto & filter_parent_child : filter_parent_node->children)
{
if (filter_parent_child == filter_node)
{
filter_parent_child = frame.node;
// for (auto & filter_parent_child : filter_parent_node->children)
// {
// if (filter_parent_child == filter_node)
// {
// filter_parent_child = frame.node;
size_t stack_size = stack.size();
// size_t stack_size = stack.size();
/// Step is completely replaced with PREWHERE filter actions, remove it from stack.
std::swap(stack[stack_size - 1], stack[stack_size - 2]);
stack.pop_back();
// /// Step is completely replaced with PREWHERE filter actions, remove it from stack.
// std::swap(stack[stack_size - 1], stack[stack_size - 2]);
// stack.pop_back();
break;
}
}
// break;
// }
// }
}
}

View File

@ -89,6 +89,34 @@ size_t countPartitions(const MergeTreeData::DataPartsVector & prepared_parts)
return countPartitions(prepared_parts, get_partition_id);
}
bool restoreDAGInputs(ActionsDAG & dag, const NameSet & inputs)
{
std::unordered_set<const ActionsDAG::Node *> outputs(dag.getOutputs().begin(), dag.getOutputs().end());
bool added = false;
for (const auto * input : dag.getInputs())
{
if (inputs.contains(input->result_name) && !outputs.contains(input))
{
dag.getOutputs().push_back(input);
added = true;
}
}
return added;
}
bool restorePrewhereInputs(PrewhereInfo & info, const NameSet & inputs)
{
bool added = false;
if (info.row_level_filter)
added = added || restoreDAGInputs(*info.row_level_filter, inputs);
if (info.prewhere_actions)
added = added || restoreDAGInputs(*info.prewhere_actions, inputs);
return added;
}
}
namespace ProfileEvents
@ -786,18 +814,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
/// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
/// See 02354_read_in_order_prewhere.sql as an example.
bool have_input_columns_removed_after_prewhere = false;
if (prewhere_info && prewhere_info->prewhere_actions)
if (prewhere_info)
{
auto & outputs = prewhere_info->prewhere_actions->getOutputs();
std::unordered_set<const ActionsDAG::Node *> outputs_set(outputs.begin(), outputs.end());
for (const auto * input : prewhere_info->prewhere_actions->getInputs())
{
if (!outputs_set.contains(input))
{
outputs.push_back(input);
have_input_columns_removed_after_prewhere = true;
}
}
NameSet sorting_columns;
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
sorting_columns.insert(column.name);
have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns);
}
/// Let's split ranges to avoid reading much data.
@ -984,7 +1007,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
/// Thus we need to merge all partition parts into a single sorted stream.
Pipe pipe = Pipe::unitePipes(std::move(pipes));
merge_streams(pipe);
out_projection = createProjection(pipe_header);
return pipe;
}
@ -1133,6 +1155,14 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
auto sorting_expr = std::make_shared<ExpressionActions>(metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
if (prewhere_info)
{
NameSet sorting_columns;
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
sorting_columns.insert(column.name);
restorePrewhereInputs(*prewhere_info, sorting_columns);
}
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
@ -1802,13 +1832,20 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
if (!final && result.sampling.use_sampling)
{
NameSet sampling_columns;
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
for (const auto & column : result.sampling.filter_expression->getRequiredColumns().getNames())
{
if (!names.contains(column))
column_names_to_read.push_back(column);
sampling_columns.insert(column);
}
if (prewhere_info)
restorePrewhereInputs(*prewhere_info, sampling_columns);
}
if (final)
@ -2002,6 +2039,24 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
});
}
/// Some extra columns could be added by sample/final/in-order/etc
/// Remove them from header if not needed.
if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting_dag_expr = std::make_shared<ExpressionActions>(convert_actions_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, converting_dag_expr);
});
}
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

View File

@ -449,8 +449,8 @@ Block MergeTreeSelectProcessor::applyPrewhereActions(Block block, const Prewhere
Block MergeTreeSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
injectVirtualColumns(block, 0, nullptr, partition_value_type, virtual_columns);
auto transformed = applyPrewhereActions(std::move(block), prewhere_info);
injectVirtualColumns(transformed, 0, nullptr, partition_value_type, virtual_columns);
return transformed;
}

View File

@ -132,14 +132,14 @@ MergeTreeWhereOptimizer::FilterActionsOptimizeResult MergeTreeWhereOptimizer::op
if (!optimize_result)
return {};
if (optimize_result->where_conditions.empty())
return {.prewhere_nodes = {}, .fully_moved_to_prewhere = true};
// if (optimize_result->where_conditions.empty())
// return {.prewhere_nodes = {}, .fully_moved_to_prewhere = true};
std::unordered_set<const ActionsDAG::Node *> prewhere_conditions;
for (const auto & condition : optimize_result->prewhere_conditions)
prewhere_conditions.insert(condition.node.getDAGNode());
return {.prewhere_nodes = std::move(prewhere_conditions), .fully_moved_to_prewhere = false};
return {.prewhere_nodes = std::move(prewhere_conditions), .fully_moved_to_prewhere = optimize_result->where_conditions.empty()};
}
static void collectColumns(const RPNBuilderTreeNode & node, const NameSet & columns_names, NameSet & result_set, bool & has_invalid_column)

View File

@ -52,7 +52,7 @@ SELECT _part_offset, foo FROM t_1 where granule == 0 AND _part_offset >= 100000
SELECT 'PREWHERE';
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere granule == 0 where _part_offset >= 100000;
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part != '' where granule == 0; -- { serverError 10 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part_offset > 100000 where granule == 0; -- { serverError 10 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part != '' where granule == 0; -- { serverError 10, 16 }
SELECT count(*), sum(_part_offset), sum(order_0) from t_1 prewhere _part_offset > 100000 where granule == 0; -- { serverError 10, 16 }
SELECT _part_offset FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;
SELECT _part_offset, foo FROM t_1 PREWHERE order_0 % 10000 == 42 ORDER BY order_0 LIMIT 3;