diff --git a/dbms/src/Common/SharedBlockRowRef.h b/dbms/src/Common/SharedBlockRowRef.h index 8977fedc50e..d37bcbe193a 100644 --- a/dbms/src/Common/SharedBlockRowRef.h +++ b/dbms/src/Common/SharedBlockRowRef.h @@ -79,27 +79,8 @@ struct SharedBlockRowRef bool empty() const { return columns == nullptr; } size_t size() const { return empty() ? 0 : columns->size(); } - /// gets pointers to all columns of block, which were used for ORDER BY - static ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description) - { - size_t size = description.size(); - ColumnRawPtrs res; - res.reserve(size); - - for (size_t i = 0; i < size; ++i) - { - const IColumn * column = !description[i].column_name.empty() - ? block.getByName(description[i].column_name).column.get() - : block.safeGetByPosition(description[i].column_number).column.get(); - res.emplace_back(column); - } - - return res; - } - - - static void setSharedBlockRowRef(SharedBlockRowRef & row_ref, SharedBlockPtr & shared_block, ColumnRawPtrs * columns, - size_t row_num) + static void setSharedBlockRowRef(SharedBlockRowRef & row_ref, + SharedBlockPtr & shared_block,ColumnRawPtrs * columns, size_t row_num) { row_ref.row_num = row_num; row_ref.columns = columns; @@ -107,6 +88,4 @@ struct SharedBlockRowRef } }; - - -} +} \ No newline at end of file diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index 4dbecc4dcc7..ea794425bae 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -6,6 +6,24 @@ namespace DB { +/// gets pointers to all columns of block, which were used for ORDER BY +static ColumnRawPtrs getSortColumns(const Block & block, const SortDescription & description) +{ + size_t size = description.size(); + ColumnRawPtrs res; + res.reserve(size); + + for (size_t i = 0; i < size; ++i) + { + const IColumn * column = !description[i].column_name.empty() + ? block.getByName(description[i].column_name).column.get() + : block.safeGetByPosition(description[i].column_number).column.get(); + res.emplace_back(column); + } + + return res; +} + LimitBlockInputStream::LimitBlockInputStream( const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, @@ -21,29 +39,30 @@ LimitBlockInputStream::LimitBlockInputStream( children.push_back(input); } - Block LimitBlockInputStream::readImpl() { Block res; UInt64 rows = 0; - /// pos >= offset + limit and all rows in previous block were equal to ties_row_ref - /// so we check current block - if (with_ties && ties_row_ref.shared_block) + /// pos >= offset + limit and all rows in the end of previous block were equal + /// to last row at 'limit' position. So we check current block. + if (!ties_row_ref.empty() && pos >= offset + limit) { - rows = res.rows(); - pos += rows; res = children.back()->read(); + rows = res.rows(); + if (!res) + return res; SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); - ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description); - UInt64 len; + ptr->sort_columns = getSortColumns(*ptr, description); + UInt64 len; for (len = 0; len < rows; ++len) { SharedBlockRowRef current_row; - SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, len); + SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &ptr->sort_columns, len); + if (current_row != ties_row_ref) { ties_row_ref.reset(); @@ -51,7 +70,7 @@ Block LimitBlockInputStream::readImpl() } } - if (len < rows - 1) + if (len < rows) { for (size_t i = 0; i < ptr->columns(); ++i) ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len); @@ -60,8 +79,6 @@ Block LimitBlockInputStream::readImpl() return *ptr; } - /// pos - how many lines were read, including the last read block - if (pos >= offset + limit) { if (!always_read_till_end) @@ -83,10 +100,18 @@ Block LimitBlockInputStream::readImpl() pos += rows; } while (pos <= offset); + SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); + if (with_ties) + ptr->sort_columns = getSortColumns(*ptr, description); /// give away the whole block if (pos >= offset + rows && pos <= offset + limit) - return res; + { + /// Save rowref for last row, because probalbly next block begins with the same row. + if (with_ties && pos == offset + limit) + SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &ptr->sort_columns, rows - 1); + return *ptr; + } /// give away a piece of the block UInt64 start = std::max( @@ -98,18 +123,16 @@ Block LimitBlockInputStream::readImpl() static_cast(pos) - static_cast(offset), static_cast(limit) + static_cast(offset) - static_cast(pos) + static_cast(rows))); - SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); /// check if other rows in current block equals to last one in limit if (with_ties) { - ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description); - SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1); + SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &ptr->sort_columns, start + length - 1); for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i) { SharedBlockRowRef current_row; - SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, i); + SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &ptr->sort_columns, i); if (current_row == ties_row_ref) ++length; else @@ -120,6 +143,9 @@ Block LimitBlockInputStream::readImpl() } } + if (start + length == rows) + return *ptr; + for (size_t i = 0; i < ptr->columns(); ++i) ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length); diff --git a/dbms/src/DataStreams/LimitBlockInputStream.h b/dbms/src/DataStreams/LimitBlockInputStream.h index 763cefc63d8..6c5f76cdaaf 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.h +++ b/dbms/src/DataStreams/LimitBlockInputStream.h @@ -18,7 +18,7 @@ public: * If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases: * when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server. * If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats - * with_ties = true, when query has WITH TIES modifier. If so,description should be provided + * with_ties = true, when query has WITH TIES modifier. If so, description should be provided * description lets us know which row we should check for equality */ LimitBlockInputStream( diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 9cc4a9b4bb4..d94aa47fec3 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -719,7 +719,7 @@ static std::pair getLimitLengthAndOffset(const ASTSelectQuery & static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context) { /// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY. - if (!query.distinct && !query.limitBy()) + if (!query.distinct && !query.limitBy() && !query.limit_with_ties) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); return limit_length + limit_offset; @@ -2238,7 +2238,13 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) if (query.limitLength()) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); - SortDescription sort_descr = getSortDescription(query, context); + SortDescription sort_descr; + if (query.limit_with_ties) + { + if (!query.orderBy()) + throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR); + sort_descr = getSortDescription(query, context); + } pipeline.transform([&, limit = limit_length + limit_offset](auto & stream) { stream = std::make_shared(stream, limit, 0, false, false, query.limit_with_ties, sort_descr);