From 286c65e89ec29bf1ce4a2624e05f56662fbd675e Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 28 Feb 2020 23:20:39 +0300 Subject: [PATCH] fix LIMIT WITH TIES --- dbms/src/Processors/LimitTransform.cpp | 67 ++++++++++--------- dbms/src/Processors/LimitTransform.h | 7 +- dbms/src/Processors/SharedChunk.h | 62 +++-------------- .../Transforms/MergingSortedTransform.h | 2 +- .../00996_limit_with_ties.reference | 2 + .../0_stateless/00996_limit_with_ties.sql | 5 +- 6 files changed, 55 insertions(+), 90 deletions(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 7bf61a2a9bc..03a5c1ef44d 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -23,6 +23,18 @@ LimitTransform::LimitTransform( } } +SharedChunkPtr LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const +{ + assert(row < chunk.getNumRows()); + auto last_row_columns = chunk.cloneEmptyColumns(); + const auto & current_columns = chunk.getColumns(); + for (size_t i = 0; i < current_columns.size(); ++i) + last_row_columns[i]->insertFrom(*current_columns[i], row); + SharedChunkPtr shared_chunk = std::make_shared(Chunk(std::move(last_row_columns), 1)); + shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); + return shared_chunk; +} + LimitTransform::Status LimitTransform::prepare() { @@ -53,7 +65,7 @@ LimitTransform::Status LimitTransform::prepare() } /// Check if we are done with pushing. - bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty(); + bool pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; if (pushing_is_finished) { if (!always_read_till_end) @@ -125,12 +137,9 @@ LimitTransform::Status LimitTransform::prepare() if (output.hasData()) return Status::PortFull; + /// Save the last row of current block to check if next block begins with the same row (for WITH TIES). if (with_ties && rows_read == offset + limit) - { - SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone()); - shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); - ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1); - } + previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1); output.push(std::move(current_chunk)); has_block = false; @@ -138,8 +147,9 @@ LimitTransform::Status LimitTransform::prepare() return Status::PortFull; } + bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit; /// No more data is needed. - if (!always_read_till_end && rows_read >= offset + limit) + if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties) input.close(); return Status::Ready; @@ -148,36 +158,32 @@ LimitTransform::Status LimitTransform::prepare() void LimitTransform::work() { - SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk)); + SharedChunkPtr shared_chunk = std::make_shared(std::move(current_chunk)); shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); size_t num_rows = shared_chunk->getNumRows(); size_t num_columns = shared_chunk->getNumColumns(); - if (!ties_row_ref.empty() && rows_read >= offset + limit) + if (previous_row_chunk && rows_read >= offset + limit) { - UInt64 len; - for (len = 0; len < num_rows; ++len) + /// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES) + size_t current_row_num = 0; + for (; current_row_num < num_rows; ++current_row_num) { - SharedChunkRowRef current_row; - current_row.set(shared_chunk, &shared_chunk->sort_columns, len); - - if (current_row != ties_row_ref) - { - ties_row_ref.reset(); + if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk)) break; - } } auto columns = shared_chunk->detachColumns(); - if (len < num_rows) + if (current_row_num < num_rows) { + previous_row_chunk = {}; for (size_t i = 0; i < num_columns; ++i) - columns[i] = columns[i]->cut(0, len); + columns[i] = columns[i]->cut(0, current_row_num); } - current_chunk.setColumns(std::move(columns), len); + current_chunk.setColumns(std::move(columns), current_row_num); block_processed = true; return; } @@ -193,22 +199,21 @@ void LimitTransform::work() static_cast(limit) + static_cast(offset) - static_cast(rows_read) + static_cast(num_rows))); /// check if other rows in current block equals to last one in limit - if (with_ties) + if (with_ties && length) { - ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1); - SharedChunkRowRef current_row; + size_t current_row_num = start + length; + previous_row_chunk = makeChunkWithPreviousRow(*shared_chunk, current_row_num - 1); - for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i) + for (; current_row_num < num_rows; ++current_row_num) { - current_row.set(shared_chunk, &shared_chunk->sort_columns, i); - if (current_row == ties_row_ref) - ++length; - else + if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk)) { - ties_row_ref.reset(); + previous_row_chunk = {}; break; } } + + length = current_row_num - start; } if (length == num_rows) @@ -228,7 +233,7 @@ void LimitTransform::work() block_processed = true; } -ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) +ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const { ColumnRawPtrs res; res.reserve(description.size()); diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 3df5e3cc049..68193736f0e 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -13,6 +13,7 @@ private: InputPort & input; OutputPort & output; + size_t limit; size_t offset; size_t rows_read = 0; /// including the last read block @@ -26,10 +27,12 @@ private: bool with_ties; const SortDescription description; - SharedChunkRowRef ties_row_ref; + + SharedChunkPtr previous_row_chunk; std::vector sort_column_positions; - ColumnRawPtrs extractSortColumns(const Columns & columns); + SharedChunkPtr makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const; + ColumnRawPtrs extractSortColumns(const Columns & columns) const; public: LimitTransform( diff --git a/dbms/src/Processors/SharedChunk.h b/dbms/src/Processors/SharedChunk.h index c6fe3c12f89..0b5c397740e 100644 --- a/dbms/src/Processors/SharedChunk.h +++ b/dbms/src/Processors/SharedChunk.h @@ -25,67 +25,21 @@ struct SharedChunk : Chunk ColumnRawPtrs sort_columns; SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {} -}; - -} - -inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr) -{ - ++ptr->refcount; -} - -inline void intrusive_ptr_release(detail::SharedChunk * ptr) -{ - if (0 == --ptr->refcount) - delete ptr; -} - -using SharedChunkPtr = boost::intrusive_ptr; -struct SharedChunkRowRef -{ - ColumnRawPtrs * columns = nullptr; - size_t row_num; - SharedChunkPtr shared_block; - - void swap(SharedChunkRowRef & other) + bool sortColumnsEqualAt(size_t row_num, size_t other_row_num, const detail::SharedChunk & other) const { - std::swap(columns, other.columns); - std::swap(row_num, other.row_num); - std::swap(shared_block, other.shared_block); - } - - /// The number and types of columns must match. - bool operator==(const SharedChunkRowRef & other) const - { - size_t size = columns->size(); + size_t size = sort_columns.size(); for (size_t i = 0; i < size; ++i) - if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1)) + if (0 != sort_columns[i]->compareAt(row_num, other_row_num, *other.sort_columns[i], 1)) return false; return true; } - - bool operator!=(const SharedChunkRowRef & other) const - { - return !(*this == other); - } - - void reset() - { - SharedChunkRowRef empty; - swap(empty); - } - - bool empty() const { return columns == nullptr; } - size_t size() const { return empty() ? 0 : columns->size(); } - - void set(SharedChunkPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_) - { - shared_block = shared_block_; - columns = columns_; - row_num = row_num_; - } }; +} + +using SharedChunkPtr = std::shared_ptr; + + } diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index 75de14b1ba0..318caadc3d6 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -146,7 +146,7 @@ private: if (!shared_chunk_ptr) { - shared_chunk_ptr = new detail::SharedChunk(std::move(chunk)); + shared_chunk_ptr = std::make_shared(std::move(chunk)); cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num); has_collation |= cursors[source_num].has_collation; } diff --git a/dbms/tests/queries/0_stateless/00996_limit_with_ties.reference b/dbms/tests/queries/0_stateless/00996_limit_with_ties.reference index aa5d102bc9b..97e6c7682e5 100644 --- a/dbms/tests/queries/0_stateless/00996_limit_with_ties.reference +++ b/dbms/tests/queries/0_stateless/00996_limit_with_ties.reference @@ -50,3 +50,5 @@ 2 2 * +100 +100 diff --git a/dbms/tests/queries/0_stateless/00996_limit_with_ties.sql b/dbms/tests/queries/0_stateless/00996_limit_with_ties.sql index 3e4813bc6b5..88caec707d7 100644 --- a/dbms/tests/queries/0_stateless/00996_limit_with_ties.sql +++ b/dbms/tests/queries/0_stateless/00996_limit_with_ties.sql @@ -1,8 +1,6 @@ DROP TABLE IF EXISTS ties; CREATE TABLE ties (a Int) ENGINE = Memory; --- SET experimental_use_processors=1; - INSERT INTO ties VALUES (1), (1), (2), (2), (2), (2) (3), (3); SELECT a FROM ties order by a limit 1 with ties; @@ -32,4 +30,7 @@ SELECT '*'; SELECT a FROM ties order by a limit 3, 2 with ties; SELECT '*'; +select count() from (select number > 100 from numbers(2000) order by number > 100 limit 1, 7 with ties); --TODO replace "number > 100" with "number > 100 as n" +select count() from (select number, number < 100 from numbers(2000) order by number < 100 desc limit 10 with ties); + DROP TABLE ties;