This commit is contained in:
Hayk Manukyan 2024-09-19 15:51:19 +03:00 committed by GitHub
commit 015e5d053a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1050 additions and 53 deletions

View File

@ -14,7 +14,7 @@ namespace DB
* 3. All functions are resolved. Next passes can expect that for each FunctionNode its result type will be set, and it will be resolved
* as aggregate or non aggregate function.
* 4. All lambda expressions that are function arguments are resolved. Next passes can expect that LambaNode expression is resolved, and lambda has concrete arguments.
* 5. All standalone lambda expressions are resolved. Next passes can expect that there will be no standalone LambaNode expressions in query.
* 5. All standalone lambda expressions are resolved. Next passes can expect that there will be no standalone LambdaNode expressions in query.
* 6. Constants are folded. Example: SELECT plus(1, 1).
* Motivation for this, there are places in query tree that must contain constant:
* Function parameters. Example: SELECT quantile(0.5)(x).

View File

@ -232,6 +232,24 @@ void QueryNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, s
getLimitByOffset()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasLimitInrangeFrom() && hasLimitInrangeTo())
{
buffer << '\n' << std::string(indent + 2, ' ') << "LIMIT INRANGE FROM\n";
getLimitInrangeFrom()->dumpTreeImpl(buffer, format_state, indent + 4);
buffer << '\n' << std::string(indent + 2, ' ') << "TO\n";
getLimitInrangeTo()->dumpTreeImpl(buffer, format_state, indent + 4);
}
else if (hasLimitInrangeFrom())
{
buffer << '\n' << std::string(indent + 2, ' ') << "LIMIT INRANGE FROM\n";
getLimitInrangeFrom()->dumpTreeImpl(buffer, format_state, indent + 4);
}
else if (hasLimitInrangeTo())
{
buffer << '\n' << std::string(indent + 2, ' ') << "LIMIT INRANGE TO\n";
getLimitInrangeTo()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasLimitBy())
{
buffer << '\n' << std::string(indent + 2, ' ') << "LIMIT BY\n";
@ -445,6 +463,12 @@ ASTPtr QueryNode::toASTImpl(const ConvertToASTOptions & options) const
if (hasLimitByOffset())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, getLimitByOffset()->toAST(options));
if (hasLimitInrangeFrom())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_INRANGE_FROM, getLimitInrangeFrom()->toAST(options));
if (hasLimitInrangeTo())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_INRANGE_TO, getLimitInrangeTo()->toAST(options));
if (hasLimitBy())
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, getLimitBy().toAST(options));

View File

@ -530,6 +530,42 @@ public:
return children[limit_by_offset_child_index];
}
/// Returns true if query node LIMIT INRANGE FROM section is not empty, false otherwise
bool hasLimitInrangeFrom() const
{
return children[limit_inrange_from_child_index] != nullptr;
}
/// Get LIMIT INRANGE FROM section node
const QueryTreeNodePtr & getLimitInrangeFrom() const
{
return children[limit_inrange_from_child_index];
}
/// Get LIMIT INRANGE FROM section node
QueryTreeNodePtr & getLimitInrangeFrom()
{
return children[limit_inrange_from_child_index];
}
/// Returns true if query node LIMIT INRANGE TO section is not empty, false otherwise
bool hasLimitInrangeTo() const
{
return children[limit_inrange_to_child_index] != nullptr;
}
/// Get LIMIT INRANGE TO section node
const QueryTreeNodePtr & getLimitInrangeTo() const
{
return children[limit_inrange_to_child_index];
}
/// Get LIMIT INRANGE TO section node
QueryTreeNodePtr & getLimitInrangeTo()
{
return children[limit_inrange_to_child_index];
}
/// Returns true if query node LIMIT BY section is not empty, false otherwise
bool hasLimitBy() const
{
@ -656,11 +692,13 @@ private:
static constexpr size_t qualify_child_index = 8;
static constexpr size_t order_by_child_index = 9;
static constexpr size_t interpolate_child_index = 10;
static constexpr size_t limit_by_limit_child_index = 11;
static constexpr size_t limit_by_offset_child_index = 12;
static constexpr size_t limit_by_child_index = 13;
static constexpr size_t limit_child_index = 14;
static constexpr size_t offset_child_index = 15;
static constexpr size_t limit_inrange_from_child_index = 11;
static constexpr size_t limit_inrange_to_child_index = 12;
static constexpr size_t limit_by_limit_child_index = 13;
static constexpr size_t limit_by_offset_child_index = 14;
static constexpr size_t limit_by_child_index = 15;
static constexpr size_t limit_child_index = 16;
static constexpr size_t offset_child_index = 17;
static constexpr size_t children_size = offset_child_index + 1;
};

View File

@ -383,6 +383,14 @@ QueryTreeNodePtr QueryTreeBuilder::buildSelectExpression(const ASTPtr & select_q
if (select_limit_by_offset)
current_query_tree->getLimitByOffset() = buildExpression(select_limit_by_offset, current_context);
auto select_limit_inrange_from = select_query_typed.limitInRangeFrom();
if (select_limit_inrange_from)
current_query_tree->getLimitInrangeFrom() = buildExpression(select_limit_inrange_from, current_context);
auto select_limit_inrange_to = select_query_typed.limitInRangeTo();
if (select_limit_inrange_to)
current_query_tree->getLimitInrangeTo() = buildExpression(select_limit_inrange_to, current_context);
auto select_limit_by = select_query_typed.limitBy();
if (select_limit_by)
current_query_tree->getLimitByNode() = buildExpressionList(select_limit_by, current_context);

View File

@ -5340,6 +5340,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasInterpolate())
visitor.visit(query_node_typed.getInterpolate());
if (query_node_typed.hasLimitInrangeFrom())
visitor.visit(query_node_typed.getLimitInrangeFrom());
if (query_node_typed.hasLimitInrangeTo())
visitor.visit(query_node_typed.getLimitInrangeTo());
if (query_node_typed.hasLimitByLimit())
visitor.visit(query_node_typed.getLimitByLimit());
@ -5504,6 +5510,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasInterpolate())
resolveInterpolateColumnsNodeList(query_node_typed.getInterpolate(), scope);
if (query_node_typed.hasLimitInrangeFrom())
resolveExpressionNode(query_node_typed.getLimitInrangeFrom(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (query_node_typed.hasLimitInrangeTo())
resolveExpressionNode(query_node_typed.getLimitInrangeTo(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
if (query_node_typed.hasLimitByLimit())
{
resolveExpressionNode(query_node_typed.getLimitByLimit(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);

View File

@ -1665,6 +1665,51 @@ ActionsAndProjectInputsFlagPtr SelectQueryExpressionAnalyzer::appendOrderBy(
return actions;
}
bool SelectQueryExpressionAnalyzer::appendLimitInrangeFrom(ExpressionActionsChain & chain, bool only_types)
{
const auto * select_query = getSelectQuery();
if (!select_query->limitInRangeFrom())
return false;
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); // // maybe other argument for lastStep?
getRootActions(select_query->limitInRangeFrom(), only_types, step.actions()->dag);
auto limit_inrange_from_column_name = select_query->limitInRangeFrom()->getColumnName();
step.addRequiredOutput(limit_inrange_from_column_name);
const auto & node = step.actions()->dag.findInOutputs(limit_inrange_from_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in LIMIT INRANGE FROM: {}",
filter_type->getName());
return true;
}
bool SelectQueryExpressionAnalyzer::appendLimitInrangeTo(ExpressionActionsChain & chain, bool only_types)
{
const auto * select_query = getSelectQuery();
if (!select_query->limitInRangeTo())
return false;
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); // maybe other argument for lastStep?
getRootActions(select_query->limitInRangeTo(), only_types, step.actions()->dag);
auto limit_inrange_to_column_name = select_query->limitInRangeTo()->getColumnName();
step.addRequiredOutput(limit_inrange_to_column_name);
const auto & node = step.actions()->dag.findInOutputs(limit_inrange_to_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Invalid type for filter in LIMIT INRANGE TO: {}",
filter_type->getName());
return true;
}
bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain, bool only_types)
{
const auto * select_query = getSelectQuery();
@ -2197,6 +2242,21 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
chain.addStep();
}
if (query_analyzer.appendLimitInrangeFrom(chain, only_types || !second_stage)) // !first_stage?
{
// where_step_num = chain.steps.size() - 1; ??
before_limit_inrange_from = chain.getLastActions();
// if (allowEarlyConstantFolding(*before_where, settings)) ??
chain.addStep();
}
if (query_analyzer.appendLimitInrangeTo(chain, only_types || !second_stage)) // !first_stage?
{
// where_step_num = chain.steps.size() - 1; ??
before_limit_inrange_to = chain.getLastActions();
// if (allowEarlyConstantFolding(*before_where, settings)) ??
chain.addStep();
}
final_projection = query_analyzer.appendProjectResult(chain);
finalize_chain(chain);
@ -2329,6 +2389,16 @@ std::string ExpressionAnalysisResult::dump() const
ss << "before_order_by " << before_order_by->dag.dumpDAG() << "\n";
}
if (before_limit_inrange_from)
{
ss << "before_limit_inrange_from " << before_limit_inrange_from->dag.dumpDAG() << "\n";
}
if (before_limit_inrange_to)
{
ss << "before_limit_inrange_to " << before_limit_inrange_to->dag.dumpDAG() << "\n";
}
if (before_limit_by)
{
ss << "before_limit_by " << before_limit_by->dag.dumpDAG() << "\n";

View File

@ -226,6 +226,7 @@ struct ExpressionAnalysisResult
String where_column_name;
bool remove_where_filter = false;
bool remove_inrange_filter = false;
bool optimize_read_in_order = false;
bool optimize_aggregation_in_order = false;
bool join_has_delayed_stream = false;
@ -242,9 +243,12 @@ struct ExpressionAnalysisResult
ActionsAndProjectInputsFlagPtr before_having;
String having_column_name;
bool remove_having_filter = false;
ActionsAndProjectInputsFlagPtr before_window;
ActionsAndProjectInputsFlagPtr before_order_by;
ActionsAndProjectInputsFlagPtr before_limit_by;
ActionsAndProjectInputsFlagPtr before_limit_inrange_from;
ActionsAndProjectInputsFlagPtr before_limit_inrange_to;
ActionsAndProjectInputsFlagPtr final_projection;
/// Columns from the SELECT list, before renaming them to aliases. Used to
@ -284,6 +288,8 @@ struct ExpressionAnalysisResult
bool hasPrewhere() const { return prewhere_info.get(); }
bool hasWhere() const { return before_where.get(); }
bool hasHaving() const { return before_having.get(); }
bool hasLimitInrangeFrom() const { return before_limit_inrange_from.get(); }
bool hasLimitInrangeTo() const { return before_limit_inrange_to.get(); }
bool hasLimitBy() const { return before_limit_by.get(); }
void removeExtraColumns() const;
@ -407,6 +413,8 @@ private:
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
/// appendSelect
bool appendLimitInrangeFrom(ExpressionActionsChain & chain, bool only_types);
bool appendLimitInrangeTo(ExpressionActionsChain & chain, bool only_types);
ActionsAndProjectInputsFlagPtr appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order, ManyExpressionActions &);
bool appendLimitBy(ExpressionActionsChain & chain, bool only_types);
/// appendProjectResult

View File

@ -56,6 +56,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitInRangeStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
@ -498,6 +499,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
, metadata_snapshot(metadata_snapshot_)
, prepared_sets(prepared_sets_)
{
std::cerr << "InterpreterSelectQuery::InterpreterSelectQuery(_starts\n";
checkStackSize();
if (!prepared_sets)
@ -900,7 +902,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
/// Calculate structure of the result.
std::cerr << "result_header = getSampleBlockImpl()_1";
result_header = getSampleBlockImpl();
std::cerr << "result_header = getSampleBlockImpl()_2";
};
@ -999,6 +1003,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Blocks used in expression analysis contains size 1 const columns for constant folding and
/// null non-const columns to avoid useless memory allocations. However, a valid block sample
/// requires all columns to be of size 0, thus we need to sanitize the block here.
std::cerr << "here0\n";
sanitizeBlock(result_header, true);
}
@ -1086,6 +1091,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
{
std::cerr << "here2";
executeImpl(query_plan, std::move(input_pipe));
/// We must guarantee that result structure is the same as in getSampleBlock()
@ -1113,16 +1119,16 @@ BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
QueryPlan query_plan;
std::cerr << "InterpreterSelectQuery::execute()_1\n";
buildQueryPlan(query_plan);
std::cerr << "InterpreterSelectQuery::execute()_2\n";
auto builder = query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
std::cerr << "InterpreterSelectQuery::execute()_3\n";
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
std::cerr << "InterpreterSelectQuery::execute()_4\n";
setQuota(res.pipeline);
std::cerr << "InterpreterSelectQuery::execute()_5\n";
return res;
}
@ -1161,9 +1167,13 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& options.to_stage > QueryProcessingStage::WithMergeableState;
std::cerr << "before ExpressionAnalysisResult\n";
analysis_result = ExpressionAnalysisResult(
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, additional_filter_info, source_header);
std::cerr << "after ExpressionAnalysisResult\n";
// std::cerr << analysis_result.dump() << '\n';
// std::cerr << "after dump\n";
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
auto header = source_header;
@ -1571,7 +1581,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
bool intermediate_stage = false;
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
std::cerr << "HERE3\n";
/// Do I need to aggregate in a separate row that has not passed max_rows_to_group_by?
bool aggregate_overflow_row =
expressions.need_aggregate &&
@ -2079,6 +2089,39 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeLimitBy(query_plan);
}
if (expressions.hasLimitInrangeFrom() || expressions.hasLimitInrangeTo())
{
if (expressions.before_limit_inrange_from && expressions.before_limit_inrange_to)
{
ActionsAndProjectInputsFlagPtr first_flag = std::make_shared<ActionsAndProjectInputsFlag>();
first_flag->dag = expressions.before_limit_inrange_from->dag.clone();
ActionsAndProjectInputsFlagPtr second_flag = std::make_shared<ActionsAndProjectInputsFlag>();
second_flag->dag = expressions.before_limit_inrange_to->dag.clone();
first_flag->dag.mergeNodes(std::move(second_flag->dag));
auto last_node_name = expressions.before_limit_inrange_to->dag.getNodes().back().result_name;
for (const auto & node : first_flag->dag.getNodes())
if (last_node_name == node.result_name)
first_flag->dag.addOrReplaceInOutputs(node);
// TODO case when there are same expressions in FROM and TO sections:
// SELECT * FROM my_first_table LIMIT INRANGE FROM metric > 2 TO metric > 2
// dag.addNode method?
executeExpression(query_plan, first_flag, "LIMIT INRANGE FROM expr TO expr");
}
else if (expressions.before_limit_inrange_from)
{
executeExpression(query_plan, expressions.before_limit_inrange_from, "LIMIT INRANGE FROM expr");
}
else if (expressions.before_limit_inrange_to)
{
executeExpression(query_plan, expressions.before_limit_inrange_to, "LIMIT INRANGE TO expr");
}
executeLimitInRange(query_plan, expressions.remove_inrange_filter);
}
executeWithFill(query_plan);
/// If we have 'WITH TIES', we need execute limit before projection,
@ -2726,6 +2769,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool remove_filter)
{
std::cerr << "InterpreterSelectQuery::executeWhere starts\n";
auto dag = expression->dag.clone();
if (expression->project_input)
dag.appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
@ -2735,6 +2779,7 @@ void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsA
where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
std::cerr << "InterpreterSelectQuery::executeWhere ends\n";
}
static Aggregator::Params getAggregatorParams(
@ -3203,6 +3248,20 @@ void InterpreterSelectQuery::executePreLimit(QueryPlan & query_plan, bool do_not
}
}
void InterpreterSelectQuery::executeLimitInRange(QueryPlan & query_plan, bool remove_filter_column)
{
std::cerr << "IN executeLimitInRange\n";
remove_filter_column = true;
auto limit_inrange_step = std::make_unique<LimitInRangeStep>(
query_plan.getCurrentDataStream(),
getSelectQuery().limitInRangeFrom() ? getSelectQuery().limitInRangeFrom()->getColumnName() : "",
getSelectQuery().limitInRangeTo() ? getSelectQuery().limitInRangeTo()->getColumnName() : "",
remove_filter_column);
limit_inrange_step->setStepDescription("LIMIT INRANGE");
query_plan.addStep(std::move(limit_inrange_step));
}
void InterpreterSelectQuery::executeLimitBy(QueryPlan & query_plan)
{

View File

@ -187,6 +187,7 @@ private:
void executeWithFill(QueryPlan & query_plan);
void executeMergeSorted(QueryPlan & query_plan, const std::string & description);
void executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset);
void executeLimitInRange(QueryPlan & query_plan, bool remove_filter_column);
void executeLimitBy(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);

View File

@ -313,6 +313,7 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
/// Skip union for single interpreter.
if (num_plans == 1)
{
std::cerr << "HERE1\n";
nested_interpreters.front()->buildQueryPlan(query_plan);
}
else

View File

@ -192,6 +192,20 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
}
}
if (limitInRangeFrom()) {
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT INRANGE FROM " << (s.hilite ? hilite_none : "");
limitInRangeFrom()->formatImpl(s, state, frame);
}
if (limitInRangeTo()) {
if (limitInRangeFrom())
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "TO " << (s.hilite ? hilite_none : "");
else
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT INRANGE TO " << (s.hilite ? hilite_none : "");
limitInRangeTo()->formatImpl(s, state, frame);
}
if (limitByLength())
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : "");

View File

@ -32,6 +32,8 @@ public:
LIMIT_BY,
LIMIT_OFFSET,
LIMIT_LENGTH,
LIMIT_INRANGE_FROM,
LIMIT_INRANGE_TO,
SETTINGS,
INTERPOLATE
};
@ -70,6 +72,10 @@ public:
return "LIMIT OFFSET";
case Expression::LIMIT_LENGTH:
return "LIMIT LENGTH";
case Expression::LIMIT_INRANGE_FROM:
return "LIMIT INRANGE FROM";
case Expression::LIMIT_INRANGE_TO:
return "LIMIT INRANGE FROM";
case Expression::SETTINGS:
return "SETTINGS";
case Expression::INTERPOLATE:
@ -116,6 +122,8 @@ public:
ASTPtr limitBy() const { return getExpression(Expression::LIMIT_BY); }
ASTPtr limitOffset() const { return getExpression(Expression::LIMIT_OFFSET); }
ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); }
ASTPtr limitInRangeFrom() const { return getExpression(Expression::LIMIT_INRANGE_FROM); }
ASTPtr limitInRangeTo() const { return getExpression(Expression::LIMIT_INRANGE_TO); }
ASTPtr settings() const { return getExpression(Expression::SETTINGS); }
ASTPtr interpolate() const { return getExpression(Expression::INTERPOLATE); }

View File

@ -233,6 +233,7 @@ namespace DB
MR_MACROS(IGNORE_NULLS, "IGNORE NULLS") \
MR_MACROS(ILIKE, "ILIKE") \
MR_MACROS(IN_PARTITION, "IN PARTITION") \
MR_MACROS(INRANGE, "INRANGE") \
MR_MACROS(IN, "IN") \
MR_MACROS(INDEX, "INDEX") \
MR_MACROS(INDEXES, "INDEXES") \

View File

@ -53,6 +53,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_qualify(Keyword::QUALIFY);
ParserKeyword s_order_by(Keyword::ORDER_BY);
ParserKeyword s_limit(Keyword::LIMIT);
ParserKeyword s_inrange(Keyword::INRANGE); // maybe IN_RANGE/IN RANGE/RANGE or even without this keyword?
ParserKeyword s_to(Keyword::TO);
ParserKeyword s_settings(Keyword::SETTINGS);
ParserKeyword s_by(Keyword::BY);
ParserKeyword s_rollup(Keyword::ROLLUP);
@ -99,6 +101,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr limit_length;
ASTPtr top_length;
ASTPtr settings;
ASTPtr limit_inrange_from_expression;
ASTPtr limit_inrange_to_expression;
/// WITH expr_list
{
@ -309,6 +313,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
// if (s_limit.ignore(pos, expected) && s_inrange.ignore(pos, expected))
/// This is needed for TOP expression, because it can also use WITH TIES.
bool limit_with_ties_occured = false;
@ -318,61 +324,86 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
/// LIMIT length | LIMIT offset, length | LIMIT count BY expr-list | LIMIT offset, length BY expr-list
if (s_limit.ignore(pos, expected))
{
ParserToken s_comma(TokenType::Comma);
if (!exp_elem.parse(pos, limit_length, expected))
return false;
if (s_comma.ignore(pos, expected))
/// LIMIT INRANGE FROM expr | LIMIT INRANGE TO expr | LIMIT INRANGE FROM expr TO expr
if (s_inrange.ignore(pos, expected))
{
limit_offset = limit_length;
bool from_occured = false;
bool to_occured = false;
if (s_from.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_inrange_from_expression, expected))
return false;
from_occured = true;
}
if (s_to.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_inrange_to_expression, expected))
return false;
to_occured = true;
}
if (!from_occured && !to_occured)
throw Exception(ErrorCodes::SYNTAX_ERROR, "LIMIT INRANGE requires at least a FROM or TO expression");
} else
{
ParserToken s_comma(TokenType::Comma);
if (!exp_elem.parse(pos, limit_length, expected))
return false;
if (s_with_ties.ignore(pos, expected))
if (s_comma.ignore(pos, expected))
{
limit_offset = limit_length;
if (!exp_elem.parse(pos, limit_length, expected))
return false;
if (s_with_ties.ignore(pos, expected))
{
limit_with_ties_occured = true;
select_query->limit_with_ties = true;
}
}
else if (s_offset.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
has_offset_clause = true;
}
else if (s_with_ties.ignore(pos, expected))
{
limit_with_ties_occured = true;
select_query->limit_with_ties = true;
}
}
else if (s_offset.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
has_offset_clause = true;
}
else if (s_with_ties.ignore(pos, expected))
{
limit_with_ties_occured = true;
select_query->limit_with_ties = true;
}
if (limit_with_ties_occured && distinct_on_expression_list)
throw Exception(ErrorCodes::LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED, "Can not use WITH TIES alongside LIMIT BY/DISTINCT ON");
if (s_by.ignore(pos, expected))
{
/// WITH TIES was used alongside LIMIT BY
/// But there are other kind of queries like LIMIT n BY smth LIMIT m WITH TIES which are allowed.
/// So we have to ignore WITH TIES exactly in LIMIT BY state.
if (limit_with_ties_occured)
if (limit_with_ties_occured && distinct_on_expression_list)
throw Exception(ErrorCodes::LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED, "Can not use WITH TIES alongside LIMIT BY/DISTINCT ON");
if (distinct_on_expression_list)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Can not use DISTINCT ON alongside LIMIT BY");
if (s_by.ignore(pos, expected))
{
/// WITH TIES was used alongside LIMIT BY
/// But there are other kind of queries like LIMIT n BY smth LIMIT m WITH TIES which are allowed.
/// So we have to ignore WITH TIES exactly in LIMIT BY state.
if (limit_with_ties_occured)
throw Exception(ErrorCodes::LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED, "Can not use WITH TIES alongside LIMIT BY/DISTINCT ON");
limit_by_length = limit_length;
limit_by_offset = limit_offset;
limit_length = nullptr;
limit_offset = nullptr;
if (distinct_on_expression_list)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Can not use DISTINCT ON alongside LIMIT BY");
if (!exp_list.parse(pos, limit_by_expression_list, expected))
return false;
limit_by_length = limit_length;
limit_by_offset = limit_offset;
limit_length = nullptr;
limit_offset = nullptr;
if (!exp_list.parse(pos, limit_by_expression_list, expected))
return false;
}
if (top_length && limit_length)
throw Exception(ErrorCodes::TOP_AND_LIMIT_TOGETHER, "Can not use TOP and LIMIT together");
}
if (top_length && limit_length)
throw Exception(ErrorCodes::TOP_AND_LIMIT_TOGETHER, "Can not use TOP and LIMIT together");
}
else if (s_offset.ignore(pos, expected))
{
@ -503,6 +534,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
select_query->setExpression(ASTSelectQuery::Expression::WINDOW, std::move(window_list));
select_query->setExpression(ASTSelectQuery::Expression::QUALIFY, std::move(qualify_expression));
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_INRANGE_FROM, std::move(limit_inrange_from_expression));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_INRANGE_TO, std::move(limit_inrange_to_expression));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));

View File

@ -34,6 +34,7 @@
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitInRangeStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
@ -410,6 +411,17 @@ void addFilterStep(QueryPlan & query_plan,
query_plan.addStep(std::move(where_step));
}
void addLimitInRangeStep(QueryPlan & query_plan, const LimitInRangeAnalysisResult & limit_inrange_analysis_result, const std::string & step_description)
{
auto limit_inrange_step = std::make_unique<LimitInRangeStep>(
query_plan.getCurrentDataStream(),
limit_inrange_analysis_result.from_filter_column_name,
limit_inrange_analysis_result.to_filter_column_name,
limit_inrange_analysis_result.remove_filter_column);
limit_inrange_step->setStepDescription(step_description);
query_plan.addStep(std::move(limit_inrange_step));
}
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
const AggregationAnalysisResult & aggregation_analysis_result,
const QueryAnalysisResult & query_analysis_result,
@ -1799,6 +1811,17 @@ void Planner::buildPlanForQueryNode()
bool apply_offset = query_processing_info.getToStage() != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
if (query_node.hasLimitInrangeFrom() || query_node.hasLimitInrangeTo())
{
auto & limit_inrange_analysis_result = expression_analysis_result.getLimitInRange();
addExpressionStep(
query_plan,
limit_inrange_analysis_result.combined_limit_inrange_actions,
"LIMIT INRANGE expressions",
useful_sets);
addLimitInRangeStep(query_plan, limit_inrange_analysis_result, "LIMIT INRANGE");
}
if (query_node.hasLimit() && query_node.isLimitWithTies() && apply_offset)
addLimitStep(query_plan, query_analysis_result, planner_context, query_node);

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Planner/PlannerExpressionAnalysis.h>
#include <Columns/ColumnNullable.h>
@ -60,6 +61,56 @@ std::optional<FilterAnalysisResult> analyzeFilter(const QueryTreeNodePtr & filte
result.filter_column_name = output->result_name;
actions_chain.addStep(std::make_unique<ActionsChainStep>(result.filter_actions));
return result;
}
/** Construct limit inrange analysis result for filter expression nodes (from, to).
* Actions before are merged and added into into actions chain.
* It is client responsibility to update limit inrange analysis result if filter column must be removed after chain is finalized.
*/
LimitInRangeAnalysisResult analyzeLimitInRange(
const QueryTreeNodePtr & from_filter_expression_node,
const QueryTreeNodePtr & to_filter_expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
{
LimitInRangeAnalysisResult result;
if (from_filter_expression_node && to_filter_expression_node)
{
auto first_dag = buildActionsDAGFromExpressionNode(from_filter_expression_node, input_columns, planner_context);
auto second_dag = buildActionsDAGFromExpressionNode(to_filter_expression_node, input_columns, planner_context);
result.from_filter_column_name = first_dag.getOutputs().at(0)->result_name;
result.to_filter_column_name = second_dag.getOutputs().at(0)->result_name;
auto last_node_name = second_dag.getNodes().back().result_name;
first_dag.mergeNodes(std::move(second_dag));
for (const auto & node : first_dag.getNodes())
if (last_node_name == node.result_name)
first_dag.addOrReplaceInOutputs(node);
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag = std::move(first_dag);
}
else if (from_filter_expression_node)
{
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag
= buildActionsDAGFromExpressionNode(from_filter_expression_node, input_columns, planner_context);
result.from_filter_column_name = result.combined_limit_inrange_actions->dag.getOutputs().at(0)->result_name;
}
else if (to_filter_expression_node)
{
result.combined_limit_inrange_actions = std::make_shared<ActionsAndProjectInputsFlag>();
result.combined_limit_inrange_actions->dag
= buildActionsDAGFromExpressionNode(to_filter_expression_node, input_columns, planner_context);
result.to_filter_column_name = result.combined_limit_inrange_actions->dag.getOutputs().at(0)->result_name;
}
actions_chain.addStep(std::make_unique<ActionsChainStep>(result.combined_limit_inrange_actions));
return result;
}
@ -634,6 +685,15 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
}
std::optional<LimitInRangeAnalysisResult> limit_inrange_analysis_result_optional;
if (query_node.hasLimitInrangeFrom() || query_node.hasLimitInrangeTo())
{
limit_inrange_analysis_result_optional = analyzeLimitInRange(
query_node.getLimitInrangeFrom(), query_node.getLimitInrangeTo(), current_output_columns, planner_context, actions_chain);
current_output_columns = actions_chain.getLastStepAvailableOutputColumns();
}
const auto * chain_available_output_columns = actions_chain.getLastStepAvailableOutputColumnsOrNull();
auto project_names_input = chain_available_output_columns ? *chain_available_output_columns : current_output_columns;
@ -726,6 +786,9 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (limit_by_analysis_result_optional)
expressions_analysis_result.addLimitBy(std::move(*limit_by_analysis_result_optional));
if (limit_inrange_analysis_result_optional)
expressions_analysis_result.addLimitLimitInRange(std::move(*limit_inrange_analysis_result_optional));
return expressions_analysis_result;
}

View File

@ -57,6 +57,14 @@ struct LimitByAnalysisResult
Names limit_by_column_names;
};
struct LimitInRangeAnalysisResult
{
ActionsAndProjectInputsFlagPtr combined_limit_inrange_actions;
std::string from_filter_column_name;
std::string to_filter_column_name;
bool remove_filter_column = true; // need to configure
};
class PlannerExpressionsAnalysisResult
{
public:
@ -174,6 +182,21 @@ public:
limit_by_analysis_result = std::move(limit_by_analysis_result_);
}
bool hasLimitInRange() const
{
return limit_inrange_analysis_result.combined_limit_inrange_actions != nullptr;
}
LimitInRangeAnalysisResult & getLimitInRange()
{
return limit_inrange_analysis_result;
}
void addLimitLimitInRange(LimitInRangeAnalysisResult limit_inrange_analysis_result_)
{
limit_inrange_analysis_result = std::move(limit_inrange_analysis_result_);
}
private:
ProjectionAnalysisResult projection_analysis_result;
FilterAnalysisResult where_analysis_result;
@ -183,6 +206,7 @@ private:
FilterAnalysisResult qualify_analysis_result;
SortAnalysisResult sort_analysis_result;
LimitByAnalysisResult limit_by_analysis_result;
LimitInRangeAnalysisResult limit_inrange_analysis_result;
};
/// Build expression analysis result for query tree, join tree input columns and planner context

View File

@ -0,0 +1,86 @@
#include <IO/Operators.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/LimitInRangeStep.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/LimitInRangeTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
},
{
.preserves_number_of_rows = false,
}};
}
LimitInRangeStep::LimitInRangeStep(
const DataStream & input_stream_, String from_filter_column_name_, String to_filter_column_name_, bool remove_filter_column_)
: ITransformingStep(
input_stream_,
LimitInRangeTransform::transformHeader(
input_stream_.header, from_filter_column_name_, to_filter_column_name_, remove_filter_column_),
getTraits())
, from_filter_column_name(std::move(from_filter_column_name_))
, to_filter_column_name(std::move(to_filter_column_name_))
, remove_filter_column(remove_filter_column_)
{
}
void LimitInRangeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<LimitInRangeTransform>(
header, from_filter_column_name, to_filter_column_name, remove_filter_column, on_totals);
});
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(std::move(convert_actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<ExpressionTransform>(header, convert_actions); });
}
}
void LimitInRangeStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, settings.indent_char);
settings.out << prefix << "From filter column: " << from_filter_column_name;
settings.out << prefix << "To filter column: " << to_filter_column_name;
if (remove_filter_column)
settings.out << " (removed)";
settings.out << '\n';
}
void LimitInRangeStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("From filter Column", from_filter_column_name);
map.add("To filter Column", to_filter_column_name);
map.add("Removes Filter", remove_filter_column);
}
void LimitInRangeStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(),
LimitInRangeTransform::transformHeader(
input_streams.front().header, from_filter_column_name, to_filter_column_name, remove_filter_column),
getDataStreamTraits());
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
/// Implements LIMIT INRANGE operation. See LimitInRangeTransform.
class LimitInRangeStep : public ITransformingStep
{
public:
LimitInRangeStep(
const DataStream & input_stream_, String from_filter_column_name_, String to_filter_column_name_, bool remove_filter_column_);
String getName() const override { return "Limit InRange"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const String & getFromFilterColumnName() const { return from_filter_column_name; }
const String & getToFilterColumnName() const { return to_filter_column_name; }
bool removesFilterColumn() const { return remove_filter_column; }
private:
void updateOutputStream() override;
String from_filter_column_name;
String to_filter_column_name;
bool remove_filter_column;
};
}

View File

@ -0,0 +1,435 @@
#include <algorithm>
#include <Processors/Transforms/LimitInRangeTransform.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnsCommon.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int UNKNOWN_TYPE_OF_QUERY;
}
Block LimitInRangeTransform::transformHeader(
Block header, const String & from_filter_column_name, const String & to_filter_column_name, bool remove_filter_column)
{
std::cerr << "from_filter_column_name=" << from_filter_column_name << '\n';
std::cerr << "to_filter_column_name=" << to_filter_column_name << '\n';
std::cerr << "Original header: " << header.dumpStructure() << " " << remove_filter_column << '\n';
if (!from_filter_column_name.empty())
{
auto from_filter_type = header.getByName(from_filter_column_name).type;
if (!from_filter_type->onlyNull() && !isUInt8(removeNullable(removeLowCardinality(from_filter_type))))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column {} for from_filter. Must be UInt8 or Nullable(UInt8).",
from_filter_type->getName(),
from_filter_column_name);
if (remove_filter_column)
header.erase(from_filter_column_name);
// else
// replaceFilterToConstant(header, filter_column_name);
}
if (!to_filter_column_name.empty())
{
auto to_filter_type = header.getByName(to_filter_column_name).type;
if (!to_filter_type->onlyNull() && !isUInt8(removeNullable(removeLowCardinality(to_filter_type))))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column {} for to_filter. Must be UInt8 or Nullable(UInt8).",
to_filter_type->getName(),
to_filter_column_name);
if (remove_filter_column)
header.erase(to_filter_column_name);
// else
// replaceFilterToConstant(header, filter_column_name);
}
std::cerr << "TransformHeader ending structure: " << header.dumpStructure() << '\n';
return header;
}
LimitInRangeTransform::LimitInRangeTransform(
const Block & header_,
String from_filter_column_name_,
String to_filter_column_name_,
bool remove_filter_column_,
bool on_totals_,
std::shared_ptr<std::atomic<size_t>> rows_filtered_)
: ISimpleTransform(header_, transformHeader(header_, from_filter_column_name_, to_filter_column_name_, remove_filter_column_), true)
, from_filter_column_name(std::move(from_filter_column_name_))
, to_filter_column_name(std::move(to_filter_column_name_))
, remove_filter_column(remove_filter_column_)
, on_totals(on_totals_)
, rows_filtered(rows_filtered_)
{
transformed_header = getInputPort().getHeader();
if (from_filter_column_name != "")
from_filter_column_position = transformed_header.getPositionByName(from_filter_column_name);
if (to_filter_column_name != "")
to_filter_column_position = transformed_header.getPositionByName(to_filter_column_name);
std::cerr << "Constructor header ending structure: " << transformed_header.dumpStructure() << '\n';
std::cerr << on_totals << '\n';
}
IProcessor::Status LimitInRangeTransform::prepare()
{
std::cerr << "IProcessor::Status LimitInRangeTransform::prepare\n";
// TODO: need some optimization here
// if (!on_totals
// && (constant_filter_description.always_false
// /// Optimization for `WHERE column in (empty set)`.
// /// The result will not change after set was created, so we can skip this check.
// /// It is implemented in prepare() stop pipeline before reading from input port.
// || (!are_prepared_sets_initialized && expression && expression->checkColumnIsAlwaysFalse(filter_column_name))))
// {
// input.close();
// output.finish();
// return Status::Finished;
// }
// auto status = ISimpleTransform::prepare();
// /// Until prepared sets are initialized, output port will be unneeded, and prepare will return PortFull.
// if (status != IProcessor::Status::PortFull)
// are_prepared_sets_initialized = true;
// return status;
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
if (!no_more_data_needed)
return Status::PortFull;
}
/// Stop if don't need more data.
if (no_more_data_needed)
{
input.close();
output.finish();
return Status::Finished;
}
/// Check can input.
if (!has_input)
{
if (input.isFinished())
{
output.finish();
if (!to_filter_column_name.empty())
{
if (!to_index_found)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "TO3 INDEX NOT FOUND {}", to_index_found);
}
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_data = input.pullData(set_input_not_needed_after_read);
has_input = true;
if (input_data.exception)
/// No more data needed. Exception will be thrown (or swallowed) later.
input.setNotNeeded();
}
/// Now transform.
return Status::Ready;
}
void LimitInRangeTransform::removeFilterIfNeed(Chunk & chunk) const
{
if (chunk && remove_filter_column)
{
if (!from_filter_column_name.empty())
chunk.erase(from_filter_column_position);
if (!to_filter_column_name.empty())
{
size_t adjusted_position = !from_filter_column_name.empty() ? to_filter_column_position - 1 : to_filter_column_position;
chunk.erase(adjusted_position);
}
}
}
const IColumn::Filter * initializeColumn(const Columns & columns, size_t filter_column_position)
{
ColumnPtr filter_column = columns[filter_column_position];
ColumnPtr data_holder = nullptr;
if (filter_column->isSparse())
data_holder = recursiveRemoveSparse(filter_column->getPtr());
if (filter_column->lowCardinality() && !data_holder)
{
data_holder = filter_column->convertToFullColumnIfLowCardinality();
}
const auto & column = data_holder ? *data_holder : *filter_column;
const IColumn::Filter * filter_concrete_column = nullptr;
if (const ColumnUInt8 * concrete_column = typeid_cast<const ColumnUInt8 *>(&column))
filter_concrete_column = &concrete_column->getData();
return filter_concrete_column;
}
std::optional<size_t> findFirstMatchingIndex(const IColumn::Filter * filter)
{
if (!filter || memoryIsZero(filter->data(), 0, filter->size()))
return std::nullopt; // Return nullopt if filter is nullptr or no match can be found
auto it = std::find(filter->begin(), filter->end(), 1);
return std::distance(filter->begin(), it);
}
void LimitInRangeTransform::transform(Chunk & chunk)
{
std::cerr << "In LimitInRangeTransform::transform\n";
auto chunk_rows_before = chunk.getNumRows();
if (!from_filter_column_name.empty() && !to_filter_column_name.empty())
doFromAndToTransform(chunk);
else if (!from_filter_column_name.empty())
doFromTransform(chunk);
else if (!to_filter_column_name.empty())
doToTransform(chunk);
if (rows_filtered)
*rows_filtered += chunk_rows_before - chunk.getNumRows();
}
void LimitInRangeTransform::doFromTransform(Chunk & chunk)
{
std::cerr << "FilterTransform::doFromTransform\n";
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
// DataTypes types;
// auto select_final_indices_info = getSelectByFinalIndices(chunk);
// std::cerr << constant_filter_description.always_true << " " << constant_filter_description.always_false << '\n';
if (from_index_found)
{
std::cerr << "##############\n";
std::cerr << "INDEX ALREADY EXIST;\n";
std::cerr << "##############\n";
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
auto from_filter_mask = initializeColumn(columns, from_filter_column_position);
std::optional<size_t> index = findFirstMatchingIndex(from_filter_mask);
if (index.has_value())
{
from_index_found = true;
std::cerr << "##############\n";
std::cerr << "FOUND INDEX: " << index.value() << "; " << '\n';
std::cerr << "##############\n";
}
else
{
// also can be checked with memory size = 0;
std::cerr << "##############\n";
std::cerr << "NOT FOUND"
<< "; " << '\n';
std::cerr << "##############\n";
return;
}
size_t length = num_rows_before_filtration - index.value();
for (auto & col : columns)
col = col->cut(index.value(), length);
chunk.setColumns(std::move(columns), length);
removeFilterIfNeed(chunk);
}
void LimitInRangeTransform::doToTransform(Chunk & chunk)
{
std::cerr << "FilterTransform::doToTransform\n";
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
// can be wrapped into filterDescription
auto to_filter_mask = initializeColumn(columns, to_filter_column_position);
std::optional<size_t> index = findFirstMatchingIndex(to_filter_mask);
if (index.has_value())
{
to_index_found = true;
std::cerr << "##############\n";
std::cerr << "FOUND INDEX: " << index.value() << "; " << '\n';
std::cerr << "##############\n";
}
else
{
// also can be checked with memory size = 0;
std::cerr << "##############\n";
std::cerr << "NOT FOUND" << ";\n";
std::cerr << "##############\n";
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
size_t length = index.value() + 1;
for (auto & col : columns)
col = col->cut(0, length);
stopReading(); // optimization as to_index_found
chunk.setColumns(std::move(columns), length);
removeFilterIfNeed(chunk);
// SELECT * FROM temp_table150 LIMIT INRANGE TO id = 100
}
void LimitInRangeTransform::doFromAndToTransform(Chunk & chunk)
{
/** Main algo:
* if from_index was already found:
* search to in current:
* if found: cut(0, to_index + 1) and stop, else: add all columns to chunk
*
* search for from_index and to_index in current:
* if found from_index and to_index: cut(from_index, to_index - from_index + 1) and stop
* else if found from_index only: cut(from_index, length - from_index)
* else if found to_index only: cut(0, to_index + 1) stop and throw
* else: return empty chunk
*/
std::cerr << "FilterTransform::doFromAndToTransform\n";
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
// can be wraped into filterDescription if needed(check below functions)? with including the cut
auto from_filter_mask = initializeColumn(columns, from_filter_column_position);
auto to_filter_mask = initializeColumn(columns, to_filter_column_position);
if (from_index_found)
{
std::cerr << "##############\n";
std::cerr << "FROM INDEX ALREADY EXIST" << ";\n";
std::cerr << "##############\n";
std::optional<size_t> to_index = findFirstMatchingIndex(to_filter_mask);
if (to_index.has_value())
{
to_index_found = true;
std::cerr << "##############\n";
std::cerr << "TO INDEX FOUND: " << to_index.value() << ";\n";
std::cerr << "##############\n";
for (auto & col : columns)
col = col->cut(0, to_index.value() + 1);
chunk.setColumns(std::move(columns), to_index.value() + 1);
stopReading(); // SELECT * FROM temp_table150 LIMIT INRANGE FROM id = 67000 TO id = 67100
}
else
{
// also can be checked with memory size = 0;
std::cerr << "##############\n";
std::cerr << "TO INDEX NOT FOUND" << ";\n";
std::cerr << "##############\n";
chunk.setColumns(std::move(columns), num_rows_before_filtration);
}
removeFilterIfNeed(chunk);
return;
}
std::optional<size_t> from_index = findFirstMatchingIndex(from_filter_mask);
std::optional<size_t> to_index = findFirstMatchingIndex(to_filter_mask);
if (from_index.has_value() && to_index.has_value())
{ // SELECT * FROM temp_table150 LIMIT INRANGE FROM id = 5 TO id = 100
from_index_found = true;
to_index_found = true;
// check that from_index <= to_index else throw exception
std::cerr << "##############\n";
std::cerr << "FROM AND TO INDICES FOUND: " << from_index.value() << ", " << to_index.value() << '\n';
std::cerr << "##############\n";
if (to_index.value() < from_index.value())
{ // SELECT * FROM temp_table150 LIMIT INRANGE FROM id = 100 TO id = 5
throw Exception(
ErrorCodes::UNKNOWN_TYPE_OF_QUERY,
"First occurence of to_expression (index = {}) found earlier than first occurence of from_expression (index = {})",
to_index.value(),
from_index.value());
}
for (auto & col : columns)
col = col->cut(from_index.value(), to_index.value() - from_index.value() + 1);
chunk.setColumns(std::move(columns), to_index.value() - from_index.value() + 1);
removeFilterIfNeed(chunk);
stopReading();
}
else if (from_index.has_value())
{ // SELECT * FROM temp_table150 LIMIT INRANGE FROM id = 5 TO id = 70000
std::cerr << "##############\n";
std::cerr << "FROM INDEX FOUND: " << from_index.value() << '\n';
std::cerr << "##############\n";
from_index_found = true;
for (auto & col : columns)
col = col->cut(from_index.value(), num_rows_before_filtration - from_index.value());
chunk.setColumns(std::move(columns), num_rows_before_filtration - from_index.value());
removeFilterIfNeed(chunk);
}
else if (to_index.has_value())
{ // SELECT * FROM temp_table150 LIMIT INRANGE FROM id = 90000 TO id = 100
std::cerr << "##############\n";
std::cerr << "TO INDEX FOUND earlier (when FROM not): " << to_index.value() << '\n';
std::cerr << "##############\n";
stopReading();
throw Exception(
ErrorCodes::UNKNOWN_TYPE_OF_QUERY, "First occurence of to_expression found earlier than first occurence of from_expression");
}
// nothing found, return for all;
// check in prepare what if from/to indices not found; maybe just returning nothing is ok
// or exception?
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Columns/FilterDescription.h>
#include <Processors/ISimpleTransform.h>
namespace DB
{
/** Implements [LIMIT INRANGE FROM from_expr TO to_expr] operation.
* Takes from_expr and to_expr, which add to the block two ColumnUInt8 columns containing the filtering conditions.
* The expression is evaluated and result chunks contain only the filtered rows.
* If remove_filter_column is true, remove filter column from block.
*/
class LimitInRangeTransform : public ISimpleTransform
{
public:
LimitInRangeTransform(
const Block & header_,
String from_filter_column_name_,
String to_filter_column_name_,
bool remove_filter_column_,
bool on_totals_,
std::shared_ptr<std::atomic<size_t>> rows_filtered_ = nullptr);
static Block
transformHeader(Block header, const String & from_filter_column_name, const String & to_filter_column_name, bool remove_filter_column);
String getName() const override { return "LimitInRangeTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
String from_filter_column_name;
String to_filter_column_name;
bool remove_filter_column;
bool on_totals;
ConstantFilterDescription constant_filter_description;
size_t from_filter_column_position = 0;
size_t to_filter_column_position = 0;
std::shared_ptr<std::atomic<size_t>> rows_filtered;
std::atomic<bool> from_index_found;
std::atomic<bool> to_index_found;
/// Header after expression, but before removing filter column.
Block transformed_header;
// bool are_prepared_sets_initialized = false;
void doFromTransform(Chunk & chunk);
void doToTransform(Chunk & chunk);
void doFromAndToTransform(Chunk & chunk);
void removeFilterIfNeed(Chunk & chunk) const;
};
}