Add new feature: limit length and offset setting (#17633)

Co-authored-by: Alexander Kazakov <Akazz@users.noreply.github.com>
This commit is contained in:
hexiaoting 2020-12-17 22:16:14 +08:00 committed by GitHub
parent 0f3582883c
commit cb91d64fe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 195 additions and 26 deletions

View File

@ -239,6 +239,8 @@ class IColumn;
* Almost all limits apply to each stream individually. \
*/ \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \

View File

@ -34,6 +34,8 @@ protected:
Block result_header;
SelectQueryOptions options;
size_t max_streams = 1;
bool settings_limit_offset_needed = false;
bool settings_limit_offset_done = false;
};
}

View File

@ -9,10 +9,14 @@
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/OffsetStep.h>
#include <Common/typeid_cast.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <algorithm>
namespace DB
{
@ -130,10 +134,14 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
{
ASTSelectWithUnionQuery * ast = query_ptr->as<ASTSelectWithUnionQuery>();
const Settings & settings = context->getSettingsRef();
if (options.subquery_depth == 0 && (settings.limit > 0 || settings.offset > 0))
settings_limit_offset_needed = true;
/// Normalize AST Tree
if (!ast->is_normalized)
{
CustomizeASTSelectWithUnionQueryNormalizeVisitor::Data union_default_mode{context->getSettingsRef().union_default_mode};
CustomizeASTSelectWithUnionQueryNormalizeVisitor::Data union_default_mode{settings.union_default_mode};
CustomizeASTSelectWithUnionQueryNormalizeVisitor(union_default_mode).visit(query_ptr);
/// After normalization, if it only has one ASTSelectWithUnionQuery child,
@ -186,6 +194,52 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
}
}
if (num_children == 1 && settings_limit_offset_needed)
{
const ASTPtr first_select_ast = ast->list_of_selects->children.at(0);
ASTSelectQuery * select_query = first_select_ast->as<ASTSelectQuery>();
if (!select_query->withFill() && !select_query->limit_with_ties)
{
UInt64 limit_length = 0;
UInt64 limit_offset = 0;
const ASTPtr limit_offset_ast = select_query->limitOffset();
if (limit_offset_ast)
{
limit_offset = limit_offset_ast->as<ASTLiteral &>().value.safeGet<UInt64>();
UInt64 new_limit_offset = settings.offset + limit_offset;
limit_offset_ast->as<ASTLiteral &>().value = Field(new_limit_offset);
}
else if (settings.offset)
{
ASTPtr new_limit_offset_ast = std::make_shared<ASTLiteral>(Field(UInt64(settings.offset)));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(new_limit_offset_ast));
}
const ASTPtr limit_length_ast = select_query->limitLength();
if (limit_length_ast)
{
limit_length = limit_length_ast->as<ASTLiteral &>().value.safeGet<UInt64>();
UInt64 new_limit_length = 0;
if (settings.offset == 0)
new_limit_length = std::min(limit_length, UInt64(settings.limit));
else if (settings.offset < limit_length)
new_limit_length = settings.limit ? std::min(UInt64(settings.limit), limit_length - settings.offset) : (limit_length - settings.offset);
limit_length_ast->as<ASTLiteral &>().value = Field(new_limit_length);
}
else if (settings.limit)
{
ASTPtr new_limit_length_ast = std::make_shared<ASTLiteral>(Field(UInt64(settings.limit)));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(new_limit_length_ast));
}
settings_limit_offset_done = true;
}
}
for (size_t query_num = 0; query_num < num_children; ++query_num)
{
const Names & current_required_result_column_names
@ -293,39 +347,57 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
// auto num_distinct_union = optimizeUnionList();
size_t num_plans = nested_interpreters.size();
const Settings & settings = context->getSettingsRef();
/// Skip union for single interpreter.
if (num_plans == 1)
{
nested_interpreters.front()->buildQueryPlan(query_plan);
return;
}
else
{
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));
const auto & query = query_ptr->as<ASTSelectWithUnionQuery &>();
if (query.union_mode == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
/// Add distinct transform
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false);
query_plan.addStep(std::move(distinct_step));
}
}
std::vector<std::unique_ptr<QueryPlan>> plans(num_plans);
DataStreams data_streams(num_plans);
for (size_t i = 0; i < num_plans; ++i)
if (settings_limit_offset_needed && !settings_limit_offset_done)
{
plans[i] = std::make_unique<QueryPlan>();
nested_interpreters[i]->buildQueryPlan(*plans[i]);
data_streams[i] = plans[i]->getCurrentDataStream();
}
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
query_plan.unitePlans(std::move(union_step), std::move(plans));
const auto & query = query_ptr->as<ASTSelectWithUnionQuery &>();
if (query.union_mode == ASTSelectWithUnionQuery::Mode::DISTINCT)
{
/// Add distinct transform
const Settings & settings = context->getSettingsRef();
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step = std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false);
query_plan.addStep(std::move(distinct_step));
if (settings.limit > 0)
{
auto limit = std::make_unique<LimitStep>(query_plan.getCurrentDataStream(), settings.limit, settings.offset);
limit->setStepDescription("LIMIT OFFSET for SETTINGS");
query_plan.addStep(std::move(limit));
}
else
{
auto offset = std::make_unique<OffsetStep>(query_plan.getCurrentDataStream(), settings.offset);
offset->setStepDescription("OFFSET for SETTINGS");
query_plan.addStep(std::move(offset));
}
}
}

View File

@ -0,0 +1,63 @@
0
1
2
3
4
15
15
16
16
17
30
30
31
31
32
102
103
104
105
105
106
107
108
109
105
106
107
108
109
60
60
61
61
62
62
63
63
64
64
60
35
35
36
36
37
37
38
38
39
39
105
106
107
108
109
12
13
13
14
14
15
15
16

View File

@ -0,0 +1,30 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (i UInt64) Engine = MergeTree() order by i;
INSERT INTO test SELECT number FROM numbers(100);
INSERT INTO test SELECT number FROM numbers(10,100);
OPTIMIZE TABLE test FINAL;
-- Only set limit
SET limit = 5;
SELECT * FROM test; -- 5 rows
SELECT * FROM test OFFSET 20; -- 5 rows
SELECT * FROM (SELECT i FROM test LIMIT 10 OFFSET 50) TMP; -- 5 rows
SELECT * FROM test LIMIT 4 OFFSET 192; -- 4 rows
SELECT * FROM test LIMIT 10 OFFSET 195; -- 5 rows
-- Only set offset
SET limit = 0;
SET offset = 195;
SELECT * FROM test; -- 5 rows
SELECT * FROM test OFFSET 20; -- no result
SELECT * FROM test LIMIT 100; -- no result
SET offset = 10;
SELECT * FROM test LIMIT 20 OFFSET 100; -- 10 rows
SELECT * FROM test LIMIT 11 OFFSET 100; -- 1 rows
-- offset and limit together
SET limit = 10;
SELECT * FROM test LIMIT 50 OFFSET 50; -- 10 rows
SELECT * FROM test LIMIT 50 OFFSET 190; -- 0 rows
SELECT * FROM test LIMIT 50 OFFSET 185; -- 5 rows
SELECT * FROM test LIMIT 18 OFFSET 5; -- 8 rows