This commit is contained in:
Hayk Manukyan 2024-09-10 12:40:44 +03:00
parent c812d60791
commit 0e4d55cd37
8 changed files with 42 additions and 30 deletions

View File

@ -1652,12 +1652,12 @@ bool SelectQueryExpressionAnalyzer::appendLimitInrangeFrom(ExpressionActionsChai
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); // // maybe other argument for lastStep?
getRootActions(select_query->limitInRangeFrom(), only_types, step.actions());
getRootActions(select_query->limitInRangeFrom(), only_types, step.actions()->dag);
auto limit_inrange_from_column_name = select_query->limitInRangeFrom()->getColumnName();
step.addRequiredOutput(limit_inrange_from_column_name);
const auto & node = step.actions()->findInOutputs(limit_inrange_from_column_name);
const auto & node = step.actions()->dag.findInOutputs(limit_inrange_from_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
@ -1675,11 +1675,11 @@ bool SelectQueryExpressionAnalyzer::appendLimitInrangeTo(ExpressionActionsChain
return false;
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); // maybe other argument for lastStep?
getRootActions(select_query->limitInRangeTo(), only_types, step.actions());
getRootActions(select_query->limitInRangeTo(), only_types, step.actions()->dag);
auto limit_inrange_to_column_name = select_query->limitInRangeTo()->getColumnName();
step.addRequiredOutput(limit_inrange_to_column_name);
const auto & node = step.actions()->findInOutputs(limit_inrange_to_column_name);
const auto & node = step.actions()->dag.findInOutputs(limit_inrange_to_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in LIMIT INRANGE TO: {}",
@ -2377,12 +2377,12 @@ std::string ExpressionAnalysisResult::dump() const
if (before_limit_inrange_from)
{
ss << "before_limit_inrange_from " << before_limit_inrange_from->dumpDAG() << "\n";
ss << "before_limit_inrange_from " << before_limit_inrange_from->dag.dumpDAG() << "\n";
}
if (before_limit_inrange_to)
{
ss << "before_limit_inrange_to " << before_limit_inrange_to->dumpDAG() << "\n";
ss << "before_limit_inrange_to " << before_limit_inrange_to->dag.dumpDAG() << "\n";
}
if (before_limit_by)

View File

@ -1973,16 +1973,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
if (expressions.before_limit_inrange_from && expressions.before_limit_inrange_to)
{
ActionsDAGPtr first_dag = expressions.before_limit_inrange_from->clone();
ActionsDAGPtr second_dag = expressions.before_limit_inrange_to->clone();
ActionsAndProjectInputsFlagPtr first_flag = std::make_shared<ActionsAndProjectInputsFlag>();
first_flag->dag = expressions.before_limit_inrange_from->dag.clone();
ActionsAndProjectInputsFlagPtr second_flag = std::make_shared<ActionsAndProjectInputsFlag>();
second_flag->dag = expressions.before_limit_inrange_to->dag.clone();
first_dag->mergeNodes(std::move(*second_dag));
auto last_node_name = expressions.before_limit_inrange_to->getNodes().back().result_name;
for (const auto & node : first_dag->getNodes())
first_flag->dag.mergeNodes(std::move(second_flag->dag));
auto last_node_name = expressions.before_limit_inrange_to->dag.getNodes().back().result_name;
for (const auto & node : first_flag->dag.getNodes())
if (last_node_name == node.result_name)
first_dag->addOrReplaceInOutputs(node);
first_flag->dag.addOrReplaceInOutputs(node);
executeExpression(query_plan, first_dag, "LIMIT INRANGE FROM expr TO expr");
// TODO case when there are same expressions in FROM and TO sections:
// SELECT * FROM my_first_table LIMIT INRANGE FROM metric > 2 TO metric > 2
// dag.addNode method?
executeExpression(query_plan, first_flag, "LIMIT INRANGE FROM expr TO expr");
}
else if (expressions.before_limit_inrange_from)
{

View File

@ -1772,12 +1772,12 @@ void Planner::buildPlanForQueryNode()
if (query_node.hasLimitInrangeFrom() || query_node.hasLimitInrangeTo())
{
const auto & limit_inrange_analysis_result = expression_analysis_result.getLimitInRange();
auto & limit_inrange_analysis_result = expression_analysis_result.getLimitInRange();
addExpressionStep(
query_plan,
limit_inrange_analysis_result.combined_limit_inrange_actions,
"LIMIT INRANGE expressions",
result_actions_to_execute);
useful_sets);
addLimitInRangeStep(query_plan, limit_inrange_analysis_result, "LIMIT INRANGE");
}

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Planner/PlannerExpressionAnalysis.h>
#include <Columns/ColumnNullable.h>
@ -76,29 +77,33 @@ LimitInRangeAnalysisResult analyzeLimitInRange(
{
auto first_dag = buildActionsDAGFromExpressionNode(from_filter_expression_node, input_columns, planner_context);
auto second_dag = buildActionsDAGFromExpressionNode(to_filter_expression_node, input_columns, planner_context);
result.from_filter_column_name = first_dag->getOutputs().at(0)->result_name;
result.to_filter_column_name = second_dag->getOutputs().at(0)->result_name;
result.from_filter_column_name = first_dag.getOutputs().at(0)->result_name;
result.to_filter_column_name = second_dag.getOutputs().at(0)->result_name;
auto last_node_name = second_dag->getNodes().back().result_name;
first_dag->mergeNodes(std::move(*second_dag));
auto last_node_name = second_dag.getNodes().back().result_name;
first_dag.mergeNodes(std::move(second_dag));
for (const auto & node : first_dag->getNodes())
for (const auto & node : first_dag.getNodes())
if (last_node_name == node.result_name)
first_dag->addOrReplaceInOutputs(node);
first_dag.addOrReplaceInOutputs(node);
result.combined_limit_inrange_actions = first_dag;
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag = std::move(first_dag);
}
else if (from_filter_expression_node)
{
result.combined_limit_inrange_actions
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag
= buildActionsDAGFromExpressionNode(from_filter_expression_node, input_columns, planner_context);
result.from_filter_column_name = result.combined_limit_inrange_actions->getOutputs().at(0)->result_name;
result.from_filter_column_name = result.combined_limit_inrange_actions->dag.getOutputs().at(0)->result_name;
}
else if (to_filter_expression_node)
{
result.combined_limit_inrange_actions
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag
= buildActionsDAGFromExpressionNode(to_filter_expression_node, input_columns, planner_context);
result.to_filter_column_name = result.combined_limit_inrange_actions->getOutputs().at(0)->result_name;
result.to_filter_column_name = result.combined_limit_inrange_actions->dag.getOutputs().at(0)->result_name;
}
actions_chain.addStep(std::make_unique<ActionsChainStep>(result.combined_limit_inrange_actions));

View File

@ -59,7 +59,7 @@ struct LimitByAnalysisResult
struct LimitInRangeAnalysisResult
{
ActionsDAGPtr combined_limit_inrange_actions;
ActionsAndProjectInputsFlagPtr combined_limit_inrange_actions;
std::string from_filter_column_name;
std::string to_filter_column_name;
bool remove_filter_column = true; // need to configure
@ -187,7 +187,7 @@ public:
return limit_inrange_analysis_result.combined_limit_inrange_actions != nullptr;
}
const LimitInRangeAnalysisResult & getLimitInRange() const
LimitInRangeAnalysisResult & getLimitInRange()
{
return limit_inrange_analysis_result;
}

View File

@ -50,7 +50,7 @@ void LimitInRangeStep::transformPipeline(QueryPipelineBuilder & pipeline, const
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag, settings.getActionsSettings());
auto convert_actions = std::make_shared<ExpressionActions>(std::move(convert_actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<ExpressionTransform>(header, convert_actions); });
}

View File

@ -86,6 +86,7 @@ LimitInRangeTransform::LimitInRangeTransform(
to_filter_column_position = transformed_header.getPositionByName(to_filter_column_name);
std::cerr << "Constructor header ending structure: " << transformed_header.dumpStructure() << '\n';
std::cerr << on_totals << '\n';
}
IProcessor::Status LimitInRangeTransform::prepare()

View File

@ -46,7 +46,7 @@ private:
/// Header after expression, but before removing filter column.
Block transformed_header;
bool are_prepared_sets_initialized = false;
// bool are_prepared_sets_initialized = false;
void doFromTransform(Chunk & chunk);
void doToTransform(Chunk & chunk);