diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e0abc80b51d..dad3f82fbc4 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -689,6 +689,9 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList() && query.limitLength()) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); + if (limit_length > std::numeric_limits::max() - limit_offset) + return 0; + return limit_length + limit_offset; } return 0; @@ -1287,6 +1290,7 @@ void InterpreterSelectQuery::executeFetchColumns( && !query.limitBy() && query.limitLength() && !query_analyzer->hasAggregation() + && limit_length <= std::numeric_limits::max() - limit_offset && limit_length + limit_offset < max_block_size) { max_block_size = std::max(UInt64(1), limit_length + limit_offset); @@ -1649,8 +1653,9 @@ void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); UInt64 limit_for_distinct = 0; - /// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows. - if (!query.orderBy() || !before_order) + /// If after this stage of DISTINCT ORDER BY is not executed, + /// then you can get no more than limit_length + limit_offset of different rows. + if ((!query.orderBy() || !before_order) && limit_length <= std::numeric_limits::max() - limit_offset) limit_for_distinct = limit_length + limit_offset; SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode); @@ -1678,6 +1683,9 @@ void InterpreterSelectQuery::executePreLimit(QueryPlan & query_plan, bool do_not if (do_not_skip_offset) { + if (limit_length > std::numeric_limits::max() - limit_offset) + return; + limit_length += limit_offset; limit_offset = 0; } diff --git a/src/Processors/LimitTransform.cpp b/src/Processors/LimitTransform.cpp index fe8990f7b0f..0fdd8832419 100644 --- a/src/Processors/LimitTransform.cpp +++ b/src/Processors/LimitTransform.cpp @@ -10,7 +10,7 @@ namespace ErrorCodes } LimitTransform::LimitTransform( - const Block & header_, size_t limit_, size_t offset_, size_t num_streams, + const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, SortDescription description_) : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) @@ -46,7 +46,7 @@ LimitTransform::LimitTransform( } } -Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const +Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, UInt64 row) const { assert(row < chunk.getNumRows()); ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns()); @@ -93,7 +93,6 @@ IProcessor::Status LimitTransform::prepare( throw Exception( "Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR); - } }; @@ -107,9 +106,12 @@ IProcessor::Status LimitTransform::prepare( if (num_finished_port_pairs == ports_data.size()) return Status::Finished; + bool limit_is_unreachable = (limit > std::numeric_limits::max() - offset); + /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 - if ((rows_read >= offset + limit) && !previous_row_chunk && !always_read_till_end) + if ((!limit_is_unreachable && rows_read >= offset + limit) + && !previous_row_chunk && !always_read_till_end) { for (auto & input : inputs) input.close(); @@ -158,8 +160,10 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) return Status::PortFull; } + bool limit_is_unreachable = (limit > std::numeric_limits::max() - offset); + /// Check if we are done with pushing. - bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk; + bool is_limit_reached = !limit_is_unreachable && rows_read >= offset + limit && !previous_row_chunk; if (is_limit_reached) { if (!always_read_till_end) @@ -223,7 +227,8 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) return Status::NeedData; } - if (rows_read >= offset + rows && rows_read <= offset + limit) + if (rows <= std::numeric_limits::max() - offset && rows_read >= offset + rows + && !limit_is_unreachable && rows_read <= offset + limit) { /// Return the whole chunk. @@ -237,7 +242,7 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit; /// No more data is needed. - if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties) + if (!always_read_till_end && !limit_is_unreachable && rows_read >= offset + limit && !may_need_more_data_for_ties) input.close(); output.push(std::move(data.current_chunk)); @@ -249,13 +254,15 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data) void LimitTransform::splitChunk(PortsData & data) { auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns()); - size_t num_rows = data.current_chunk.getNumRows(); - size_t num_columns = data.current_chunk.getNumColumns(); + UInt64 num_rows = data.current_chunk.getNumRows(); + UInt64 num_columns = data.current_chunk.getNumColumns(); - if (previous_row_chunk && rows_read >= offset + limit) + bool limit_is_unreachable = (limit > std::numeric_limits::max() - offset); + + if (previous_row_chunk && !limit_is_unreachable && rows_read >= offset + limit) { /// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES) - size_t current_row_num = 0; + UInt64 current_row_num = 0; for (; current_row_num < num_rows; ++current_row_num) { if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num)) @@ -267,7 +274,7 @@ void LimitTransform::splitChunk(PortsData & data) if (current_row_num < num_rows) { previous_row_chunk = {}; - for (size_t i = 0; i < num_columns; ++i) + for (UInt64 i = 0; i < num_columns; ++i) columns[i] = columns[i]->cut(0, current_row_num); } @@ -276,19 +283,36 @@ void LimitTransform::splitChunk(PortsData & data) } /// return a piece of the block - size_t start = std::max( - static_cast(0), - static_cast(offset) - static_cast(rows_read) + static_cast(num_rows)); + UInt64 start = 0; - size_t length = std::min( - static_cast(limit), std::min( - static_cast(rows_read) - static_cast(offset), - static_cast(limit) + static_cast(offset) - static_cast(rows_read) + static_cast(num_rows))); + /// ------------[....(...).] + /// <----------------------> rows_read + /// <----------> num_rows + /// <---------------> offset + /// <---> start + + assert(offset < rows_read); + + if (offset + num_rows > rows_read) + start = offset + num_rows - rows_read; + + /// ------------[....(...).] + /// <----------------------> rows_read + /// <----------> num_rows + /// <---------------> offset + /// <---> limit + /// <---> length + /// <---> start + + UInt64 length = num_rows - start; + + if (!limit_is_unreachable && start + limit < num_rows) + length = limit; /// check if other rows in current block equals to last one in limit if (with_ties && length) { - size_t current_row_num = start + length; + UInt64 current_row_num = start + length; previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1); for (; current_row_num < num_rows; ++current_row_num) @@ -308,7 +332,7 @@ void LimitTransform::splitChunk(PortsData & data) auto columns = data.current_chunk.detachColumns(); - for (size_t i = 0; i < num_columns; ++i) + for (UInt64 i = 0; i < num_columns; ++i) columns[i] = columns[i]->cut(start, length); data.current_chunk.setColumns(std::move(columns), length); @@ -324,7 +348,7 @@ ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const return res; } -bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const +bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const { assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns()); size_t size = current_chunk_sort_columns.size(); diff --git a/src/Processors/LimitTransform.h b/src/Processors/LimitTransform.h index ffa151bc064..8865eab732a 100644 --- a/src/Processors/LimitTransform.h +++ b/src/Processors/LimitTransform.h @@ -18,9 +18,9 @@ namespace DB class LimitTransform : public IProcessor { private: + UInt64 limit; + UInt64 offset; - size_t limit; - size_t offset; bool always_read_till_end; bool with_ties; @@ -29,7 +29,7 @@ private: Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns std::vector sort_column_positions; - size_t rows_read = 0; /// including the last read block + UInt64 rows_read = 0; /// including the last read block RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// State of port's pair. @@ -46,13 +46,13 @@ private: std::vector ports_data; size_t num_finished_port_pairs = 0; - Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const; + Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, UInt64 row_num) const; ColumnRawPtrs extractSortColumns(const Columns & columns) const; - bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const; + bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, UInt64 current_chunk_row_num) const; public: LimitTransform( - const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1, + const Block & header_, UInt64 limit_, UInt64 offset_, size_t num_streams = 1, bool always_read_till_end_ = false, bool with_ties_ = false, SortDescription description_ = {}); diff --git a/src/Processors/OffsetTransform.cpp b/src/Processors/OffsetTransform.cpp index f380a5a5159..ea93b8a4792 100644 --- a/src/Processors/OffsetTransform.cpp +++ b/src/Processors/OffsetTransform.cpp @@ -10,7 +10,7 @@ namespace ErrorCodes } OffsetTransform::OffsetTransform( - const Block & header_, size_t offset_, size_t num_streams) + const Block & header_, UInt64 offset_, size_t num_streams) : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) , offset(offset_) { @@ -135,7 +135,7 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) rows_read += rows; - if (rows_read < offset) + if (rows_read <= offset) { data.current_chunk.clear(); @@ -150,7 +150,7 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) return Status::NeedData; } - if (!(rows_read >= offset + rows)) + if (!(rows <= std::numeric_limits::max() - offset && rows_read >= offset + rows)) splitChunk(data); output.push(std::move(data.current_chunk)); @@ -161,22 +161,30 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) void OffsetTransform::splitChunk(PortsData & data) const { - size_t num_rows = data.current_chunk.getNumRows(); - size_t num_columns = data.current_chunk.getNumColumns(); + UInt64 num_rows = data.current_chunk.getNumRows(); + UInt64 num_columns = data.current_chunk.getNumColumns(); /// return a piece of the block - size_t start = std::max( - static_cast(0), - static_cast(offset) - static_cast(rows_read) + static_cast(num_rows)); + UInt64 start = 0; - size_t length = static_cast(rows_read) - static_cast(offset); + /// ------------[....(...).] + /// <----------------------> rows_read + /// <----------> num_rows + /// <---------------> offset + /// <---> start - if (length == num_rows) + assert(offset < rows_read); + + if (offset + num_rows > rows_read) + start = offset + num_rows - rows_read; + else return; + UInt64 length = num_rows - start; + auto columns = data.current_chunk.detachColumns(); - for (size_t i = 0; i < num_columns; ++i) + for (UInt64 i = 0; i < num_columns; ++i) columns[i] = columns[i]->cut(start, length); data.current_chunk.setColumns(std::move(columns), length); diff --git a/src/Processors/OffsetTransform.h b/src/Processors/OffsetTransform.h index 905e8298d15..47f500ceb0d 100644 --- a/src/Processors/OffsetTransform.h +++ b/src/Processors/OffsetTransform.h @@ -13,10 +13,9 @@ namespace DB class OffsetTransform : public IProcessor { private: + UInt64 offset; + UInt64 rows_read = 0; /// including the last read block - size_t offset; - - size_t rows_read = 0; /// including the last read block RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// State of port's pair. diff --git a/src/Processors/Transforms/LimitByTransform.cpp b/src/Processors/Transforms/LimitByTransform.cpp index 8891ae11c03..5c405046a83 100644 --- a/src/Processors/Transforms/LimitByTransform.cpp +++ b/src/Processors/Transforms/LimitByTransform.cpp @@ -5,7 +5,7 @@ namespace DB { -LimitByTransform::LimitByTransform(const Block & header, size_t group_length_, size_t group_offset_, const Names & columns) +LimitByTransform::LimitByTransform(const Block & header, UInt64 group_length_, UInt64 group_offset_, const Names & columns) : ISimpleTransform(header, header, true) , group_length(group_length_) , group_offset(group_offset_) @@ -25,13 +25,13 @@ LimitByTransform::LimitByTransform(const Block & header, size_t group_length_, s void LimitByTransform::transform(Chunk & chunk) { - size_t num_rows = chunk.getNumRows(); + UInt64 num_rows = chunk.getNumRows(); auto columns = chunk.detachColumns(); IColumn::Filter filter(num_rows); - size_t inserted_count = 0; + UInt64 inserted_count = 0; - for (size_t row = 0; row < num_rows; ++row) + for (UInt64 row = 0; row < num_rows; ++row) { UInt128 key(0, 0); SipHash hash; @@ -42,9 +42,10 @@ void LimitByTransform::transform(Chunk & chunk) hash.get128(key.low, key.high); auto count = keys_counts[key]++; - if (count >= group_offset && count < group_length + group_offset) + if (count >= group_offset + && (group_length > std::numeric_limits::max() - group_offset || count < group_length + group_offset)) { - inserted_count++; + ++inserted_count; filter[row] = 1; } else diff --git a/src/Processors/Transforms/LimitByTransform.h b/src/Processors/Transforms/LimitByTransform.h index 815114946c8..9773f637f40 100644 --- a/src/Processors/Transforms/LimitByTransform.h +++ b/src/Processors/Transforms/LimitByTransform.h @@ -10,7 +10,7 @@ namespace DB class LimitByTransform : public ISimpleTransform { public: - LimitByTransform(const Block & header, size_t group_length_, size_t group_offset_, const Names & columns); + LimitByTransform(const Block & header, UInt64 group_length_, UInt64 group_offset_, const Names & columns); String getName() const override { return "LimitByTransform"; } @@ -22,8 +22,8 @@ private: MapHashed keys_counts; std::vector key_positions; - const size_t group_length; - const size_t group_offset; + const UInt64 group_length; + const UInt64 group_offset; }; } diff --git a/tests/queries/0_stateless/01391_limit_overflow.reference b/tests/queries/0_stateless/01391_limit_overflow.reference new file mode 100644 index 00000000000..6fa0a9ec2b9 --- /dev/null +++ b/tests/queries/0_stateless/01391_limit_overflow.reference @@ -0,0 +1,14 @@ +2 +3 +4 +5 +6 +7 +8 +9 +--- +5 +6 +7 +8 +9 diff --git a/tests/queries/0_stateless/01391_limit_overflow.sql b/tests/queries/0_stateless/01391_limit_overflow.sql new file mode 100644 index 00000000000..b06622799f7 --- /dev/null +++ b/tests/queries/0_stateless/01391_limit_overflow.sql @@ -0,0 +1,12 @@ +SELECT number FROM numbers(10) ORDER BY number ASC LIMIT 2, 9223372036854775807 WITH TIES; + +SELECT '---'; + +CREATE TEMPORARY TABLE a (a UInt64); +INSERT INTO TABLE a SELECT number FROM system.numbers LIMIT 10; + +SELECT a +FROM a +GROUP BY a +ORDER BY a ASC +LIMIT 5, 18446744073709551615;