mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
parent
26d32f2ce5
commit
ef930df484
@ -689,6 +689,9 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
|
||||
if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList() && query.limitLength())
|
||||
{
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
|
||||
return 0;
|
||||
|
||||
return limit_length + limit_offset;
|
||||
}
|
||||
return 0;
|
||||
@ -1287,6 +1290,7 @@ void InterpreterSelectQuery::executeFetchColumns(
|
||||
&& !query.limitBy()
|
||||
&& query.limitLength()
|
||||
&& !query_analyzer->hasAggregation()
|
||||
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset
|
||||
&& limit_length + limit_offset < max_block_size)
|
||||
{
|
||||
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
|
||||
@ -1649,8 +1653,9 @@ void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context);
|
||||
UInt64 limit_for_distinct = 0;
|
||||
|
||||
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
|
||||
if (!query.orderBy() || !before_order)
|
||||
/// If after this stage of DISTINCT ORDER BY is not executed,
|
||||
/// then you can get no more than limit_length + limit_offset of different rows.
|
||||
if ((!query.orderBy() || !before_order) && limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
|
||||
limit_for_distinct = limit_length + limit_offset;
|
||||
|
||||
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
|
||||
@ -1678,6 +1683,9 @@ void InterpreterSelectQuery::executePreLimit(QueryPlan & query_plan, bool do_not
|
||||
|
||||
if (do_not_skip_offset)
|
||||
{
|
||||
if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
|
||||
return;
|
||||
|
||||
limit_length += limit_offset;
|
||||
limit_offset = 0;
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
LimitTransform::LimitTransform(
|
||||
const Block & header_, size_t limit_, size_t offset_, size_t num_streams,
|
||||
const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams,
|
||||
bool always_read_till_end_, bool with_ties_,
|
||||
SortDescription description_)
|
||||
: IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))
|
||||
@ -46,7 +46,7 @@ LimitTransform::LimitTransform(
|
||||
}
|
||||
}
|
||||
|
||||
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const
|
||||
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, UInt64 row) const
|
||||
{
|
||||
assert(row < chunk.getNumRows());
|
||||
ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns());
|
||||
@ -93,7 +93,6 @@ IProcessor::Status LimitTransform::prepare(
|
||||
throw Exception(
|
||||
"Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status),
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
@ -107,9 +106,12 @@ IProcessor::Status LimitTransform::prepare(
|
||||
if (num_finished_port_pairs == ports_data.size())
|
||||
return Status::Finished;
|
||||
|
||||
bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
|
||||
|
||||
/// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data.
|
||||
/// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1
|
||||
if ((rows_read >= offset + limit) && !previous_row_chunk && !always_read_till_end)
|
||||
if ((!limit_is_unreachable && rows_read >= offset + limit)
|
||||
&& !previous_row_chunk && !always_read_till_end)
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.close();
|
||||
@ -158,8 +160,10 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
|
||||
|
||||
/// Check if we are done with pushing.
|
||||
bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk;
|
||||
bool is_limit_reached = !limit_is_unreachable && rows_read >= offset + limit && !previous_row_chunk;
|
||||
if (is_limit_reached)
|
||||
{
|
||||
if (!always_read_till_end)
|
||||
@ -223,7 +227,8 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
|
||||
return Status::NeedData;
|
||||
}
|
||||
|
||||
if (rows_read >= offset + rows && rows_read <= offset + limit)
|
||||
if (rows <= std::numeric_limits<UInt64>::max() - offset && rows_read >= offset + rows
|
||||
&& !limit_is_unreachable && rows_read <= offset + limit)
|
||||
{
|
||||
/// Return the whole chunk.
|
||||
|
||||
@ -237,7 +242,7 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
|
||||
|
||||
bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit;
|
||||
/// No more data is needed.
|
||||
if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties)
|
||||
if (!always_read_till_end && !limit_is_unreachable && rows_read >= offset + limit && !may_need_more_data_for_ties)
|
||||
input.close();
|
||||
|
||||
output.push(std::move(data.current_chunk));
|
||||
@ -249,13 +254,15 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
|
||||
void LimitTransform::splitChunk(PortsData & data)
|
||||
{
|
||||
auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns());
|
||||
size_t num_rows = data.current_chunk.getNumRows();
|
||||
size_t num_columns = data.current_chunk.getNumColumns();
|
||||
UInt64 num_rows = data.current_chunk.getNumRows();
|
||||
UInt64 num_columns = data.current_chunk.getNumColumns();
|
||||
|
||||
if (previous_row_chunk && rows_read >= offset + limit)
|
||||
bool limit_is_unreachable = (limit > std::numeric_limits<UInt64>::max() - offset);
|
||||
|
||||
if (previous_row_chunk && !limit_is_unreachable && rows_read >= offset + limit)
|
||||
{
|
||||
/// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES)
|
||||
size_t current_row_num = 0;
|
||||
UInt64 current_row_num = 0;
|
||||
for (; current_row_num < num_rows; ++current_row_num)
|
||||
{
|
||||
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
|
||||
@ -267,7 +274,7 @@ void LimitTransform::splitChunk(PortsData & data)
|
||||
if (current_row_num < num_rows)
|
||||
{
|
||||
previous_row_chunk = {};
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
for (UInt64 i = 0; i < num_columns; ++i)
|
||||
columns[i] = columns[i]->cut(0, current_row_num);
|
||||
}
|
||||
|
||||
@ -276,19 +283,36 @@ void LimitTransform::splitChunk(PortsData & data)
|
||||
}
|
||||
|
||||
/// return a piece of the block
|
||||
size_t start = std::max(
|
||||
static_cast<Int64>(0),
|
||||
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
|
||||
UInt64 start = 0;
|
||||
|
||||
size_t length = std::min(
|
||||
static_cast<Int64>(limit), std::min(
|
||||
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
|
||||
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
|
||||
/// ------------[....(...).]
|
||||
/// <----------------------> rows_read
|
||||
/// <----------> num_rows
|
||||
/// <---------------> offset
|
||||
/// <---> start
|
||||
|
||||
assert(offset < rows_read);
|
||||
|
||||
if (offset + num_rows > rows_read)
|
||||
start = offset + num_rows - rows_read;
|
||||
|
||||
/// ------------[....(...).]
|
||||
/// <----------------------> rows_read
|
||||
/// <----------> num_rows
|
||||
/// <---------------> offset
|
||||
/// <---> limit
|
||||
/// <---> length
|
||||
/// <---> start
|
||||
|
||||
UInt64 length = num_rows - start;
|
||||
|
||||
if (!limit_is_unreachable && start + limit < num_rows)
|
||||
length = limit;
|
||||
|
||||
/// check if other rows in current block equals to last one in limit
|
||||
if (with_ties && length)
|
||||
{
|
||||
size_t current_row_num = start + length;
|
||||
UInt64 current_row_num = start + length;
|
||||
previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);
|
||||
|
||||
for (; current_row_num < num_rows; ++current_row_num)
|
||||
@ -308,7 +332,7 @@ void LimitTransform::splitChunk(PortsData & data)
|
||||
|
||||
auto columns = data.current_chunk.detachColumns();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
for (UInt64 i = 0; i < num_columns; ++i)
|
||||
columns[i] = columns[i]->cut(start, length);
|
||||
|
||||
data.current_chunk.setColumns(std::move(columns), length);
|
||||
@ -324,7 +348,7 @@ ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const
|
||||
return res;
|
||||
}
|
||||
|
||||
bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const
|
||||
bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const
|
||||
{
|
||||
assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns());
|
||||
size_t size = current_chunk_sort_columns.size();
|
||||
|
@ -18,9 +18,9 @@ namespace DB
|
||||
class LimitTransform : public IProcessor
|
||||
{
|
||||
private:
|
||||
UInt64 limit;
|
||||
UInt64 offset;
|
||||
|
||||
size_t limit;
|
||||
size_t offset;
|
||||
bool always_read_till_end;
|
||||
|
||||
bool with_ties;
|
||||
@ -29,7 +29,7 @@ private:
|
||||
Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns
|
||||
std::vector<size_t> sort_column_positions;
|
||||
|
||||
size_t rows_read = 0; /// including the last read block
|
||||
UInt64 rows_read = 0; /// including the last read block
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
/// State of port's pair.
|
||||
@ -46,13 +46,13 @@ private:
|
||||
std::vector<PortsData> ports_data;
|
||||
size_t num_finished_port_pairs = 0;
|
||||
|
||||
Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const;
|
||||
Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, UInt64 row_num) const;
|
||||
ColumnRawPtrs extractSortColumns(const Columns & columns) const;
|
||||
bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const;
|
||||
bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const;
|
||||
|
||||
public:
|
||||
LimitTransform(
|
||||
const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1,
|
||||
const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams = 1,
|
||||
bool always_read_till_end_ = false, bool with_ties_ = false,
|
||||
SortDescription description_ = {});
|
||||
|
||||
|
@ -10,7 +10,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
OffsetTransform::OffsetTransform(
|
||||
const Block & header_, size_t offset_, size_t num_streams)
|
||||
const Block & header_, UInt64 offset_, size_t num_streams)
|
||||
: IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))
|
||||
, offset(offset_)
|
||||
{
|
||||
@ -135,7 +135,7 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
|
||||
|
||||
rows_read += rows;
|
||||
|
||||
if (rows_read < offset)
|
||||
if (rows_read <= offset)
|
||||
{
|
||||
data.current_chunk.clear();
|
||||
|
||||
@ -150,7 +150,7 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
|
||||
return Status::NeedData;
|
||||
}
|
||||
|
||||
if (!(rows_read >= offset + rows))
|
||||
if (!(rows <= std::numeric_limits<UInt64>::max() - offset && rows_read >= offset + rows))
|
||||
splitChunk(data);
|
||||
|
||||
output.push(std::move(data.current_chunk));
|
||||
@ -161,22 +161,30 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
|
||||
|
||||
void OffsetTransform::splitChunk(PortsData & data) const
|
||||
{
|
||||
size_t num_rows = data.current_chunk.getNumRows();
|
||||
size_t num_columns = data.current_chunk.getNumColumns();
|
||||
UInt64 num_rows = data.current_chunk.getNumRows();
|
||||
UInt64 num_columns = data.current_chunk.getNumColumns();
|
||||
|
||||
/// return a piece of the block
|
||||
size_t start = std::max(
|
||||
static_cast<Int64>(0),
|
||||
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
|
||||
UInt64 start = 0;
|
||||
|
||||
size_t length = static_cast<Int64>(rows_read) - static_cast<Int64>(offset);
|
||||
/// ------------[....(...).]
|
||||
/// <----------------------> rows_read
|
||||
/// <----------> num_rows
|
||||
/// <---------------> offset
|
||||
/// <---> start
|
||||
|
||||
if (length == num_rows)
|
||||
assert(offset < rows_read);
|
||||
|
||||
if (offset + num_rows > rows_read)
|
||||
start = offset + num_rows - rows_read;
|
||||
else
|
||||
return;
|
||||
|
||||
UInt64 length = num_rows - start;
|
||||
|
||||
auto columns = data.current_chunk.detachColumns();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
for (UInt64 i = 0; i < num_columns; ++i)
|
||||
columns[i] = columns[i]->cut(start, length);
|
||||
|
||||
data.current_chunk.setColumns(std::move(columns), length);
|
||||
|
@ -13,10 +13,9 @@ namespace DB
|
||||
class OffsetTransform : public IProcessor
|
||||
{
|
||||
private:
|
||||
UInt64 offset;
|
||||
UInt64 rows_read = 0; /// including the last read block
|
||||
|
||||
size_t offset;
|
||||
|
||||
size_t rows_read = 0; /// including the last read block
|
||||
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
||||
|
||||
/// State of port's pair.
|
||||
|
@ -5,7 +5,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
LimitByTransform::LimitByTransform(const Block & header, size_t group_length_, size_t group_offset_, const Names & columns)
|
||||
LimitByTransform::LimitByTransform(const Block & header, UInt64 group_length_, UInt64 group_offset_, const Names & columns)
|
||||
: ISimpleTransform(header, header, true)
|
||||
, group_length(group_length_)
|
||||
, group_offset(group_offset_)
|
||||
@ -25,13 +25,13 @@ LimitByTransform::LimitByTransform(const Block & header, size_t group_length_, s
|
||||
|
||||
void LimitByTransform::transform(Chunk & chunk)
|
||||
{
|
||||
size_t num_rows = chunk.getNumRows();
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
IColumn::Filter filter(num_rows);
|
||||
size_t inserted_count = 0;
|
||||
UInt64 inserted_count = 0;
|
||||
|
||||
for (size_t row = 0; row < num_rows; ++row)
|
||||
for (UInt64 row = 0; row < num_rows; ++row)
|
||||
{
|
||||
UInt128 key(0, 0);
|
||||
SipHash hash;
|
||||
@ -42,9 +42,10 @@ void LimitByTransform::transform(Chunk & chunk)
|
||||
hash.get128(key.low, key.high);
|
||||
|
||||
auto count = keys_counts[key]++;
|
||||
if (count >= group_offset && count < group_length + group_offset)
|
||||
if (count >= group_offset
|
||||
&& (group_length > std::numeric_limits<UInt64>::max() - group_offset || count < group_length + group_offset))
|
||||
{
|
||||
inserted_count++;
|
||||
++inserted_count;
|
||||
filter[row] = 1;
|
||||
}
|
||||
else
|
||||
|
@ -10,7 +10,7 @@ namespace DB
|
||||
class LimitByTransform : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
LimitByTransform(const Block & header, size_t group_length_, size_t group_offset_, const Names & columns);
|
||||
LimitByTransform(const Block & header, UInt64 group_length_, UInt64 group_offset_, const Names & columns);
|
||||
|
||||
String getName() const override { return "LimitByTransform"; }
|
||||
|
||||
@ -22,8 +22,8 @@ private:
|
||||
|
||||
MapHashed keys_counts;
|
||||
std::vector<size_t> key_positions;
|
||||
const size_t group_length;
|
||||
const size_t group_offset;
|
||||
const UInt64 group_length;
|
||||
const UInt64 group_offset;
|
||||
};
|
||||
|
||||
}
|
||||
|
14
tests/queries/0_stateless/01391_limit_overflow.reference
Normal file
14
tests/queries/0_stateless/01391_limit_overflow.reference
Normal file
@ -0,0 +1,14 @@
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
---
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
12
tests/queries/0_stateless/01391_limit_overflow.sql
Normal file
12
tests/queries/0_stateless/01391_limit_overflow.sql
Normal file
@ -0,0 +1,12 @@
|
||||
SELECT number FROM numbers(10) ORDER BY number ASC LIMIT 2, 9223372036854775807 WITH TIES;
|
||||
|
||||
SELECT '---';
|
||||
|
||||
CREATE TEMPORARY TABLE a (a UInt64);
|
||||
INSERT INTO TABLE a SELECT number FROM system.numbers LIMIT 10;
|
||||
|
||||
SELECT a
|
||||
FROM a
|
||||
GROUP BY a
|
||||
ORDER BY a ASC
|
||||
LIMIT 5, 18446744073709551615;
|
Loading…
Reference in New Issue
Block a user