From 8c2060b48a8c34e5e9988bd1295c89b3a0943d68 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Thu, 4 Oct 2018 17:55:02 +0300 Subject: [PATCH] wip on finish sorting --- .../MergeSortingBlockInputStream.cpp | 70 +++++++++++-------- .../MergeSortingBlockInputStream.h | 2 +- dbms/src/Interpreters/sortBlock.cpp | 21 ++++++ dbms/src/Interpreters/sortBlock.h | 20 ------ 4 files changed, 63 insertions(+), 50 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 28c8c6c2094..198384cd7a3 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -319,17 +319,28 @@ FinishMergeSortingBlockInputStream::FinishMergeSortingBlockInputStream( removeConstantsFromSortDescription(header, description_to_sort); } -static bool equalKeysAt(const ColumnsWithSortDescriptions & lhs, const ColumnsWithSortDescriptions & rhs, size_t n, size_t m) -{ - for (auto it = lhs.begin(), jt = rhs.begin(); it != lhs.end(); ++it, ++jt) +struct Less +{ + const ColumnsWithSortDescriptions & left_columns; + const ColumnsWithSortDescriptions & right_columns; + + Less(const ColumnsWithSortDescriptions & left_columns_, const ColumnsWithSortDescriptions & right_columns_) : + left_columns(left_columns_), right_columns(right_columns_) {} + + bool operator() (size_t a, size_t b) const { - int res = it->first->compareAt(n, m, *jt->first, it->second.nulls_direction); - if (res != 0) - return false; + for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt) + { + int res = it->second.direction * it->first->compareAt(a, b, *jt->first, it->second.nulls_direction); + if (res < 0) + return true; + else if (res > 0) + return false; + } + return false; } - return true; -} +}; Block FinishMergeSortingBlockInputStream::readImpl() { @@ -356,7 +367,7 @@ Block FinishMergeSortingBlockInputStream::readImpl() { block = children.back()->read(); - /// End of input stream, but we can`t returns immediatly, we need to merge already read blocks. + /// End of input stream, but we can`t return immediatly, we need to merge already read blocks. /// Check it later, when get end of stream from impl. if (!block) { @@ -373,35 +384,36 @@ Block FinishMergeSortingBlockInputStream::readImpl() if (size == 0) continue; - auto columns_with_sort_desc = getColumnsWithSortDescription(block, description_sorted); - removeConstantsFromBlock(block); - /// May be new block starts with new key. + /// Find the position of last already read key in current block. if (!blocks.empty()) { const Block & last_block = blocks.back(); - if (!equalKeysAt(getColumnsWithSortDescription(last_block, description_sorted), columns_with_sort_desc, last_block.rows() - 1, 0)) + auto last_columns = getColumnsWithSortDescription(last_block, description_sorted); + auto current_columns = getColumnsWithSortDescription(block, description_sorted); + + Less less(last_columns, current_columns); + + IColumn::Permutation perm(size); + for (size_t i = 0; i < size; ++i) + perm[i] = i; + + auto it = std::upper_bound(perm.begin(), perm.end(), last_block.rows() - 1, less); + if (it != perm.end()) + { + tail_pos = it - perm.begin(); break; + } } - - IColumn::Permutation perm(size); - for (size_t i = 0; i < size; ++i) - perm[i] = i; - - PartialSortingLess less(columns_with_sort_desc); - - /// We need to save tail of block, because next block may starts with the same key as in tail - /// and we should sort these rows in one chunk. - tail_pos = *std::lower_bound(perm.begin(), perm.end(), size - 1, less); - - if (tail_pos != 0) - break; - /// If we reach here, that means that current block has all rows with the same key as tail of a previous block. + /// If we reach here, that means that current block is first in chunk + /// or it all consists of rows with the same key as tail of a previous block. blocks.push_back(block); } + /// We need to save tail of block, because next block may starts with the same key as in tail + /// and we should sort these rows in one chunk. if (block) { Block head_block = block.cloneEmpty(); @@ -411,8 +423,8 @@ Block FinishMergeSortingBlockInputStream::readImpl() head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos); tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos); } - - blocks.push_back(head_block); + if (head_block.rows()) + blocks.push_back(head_block); } impl = std::make_unique(blocks, description_to_sort, max_merged_block_size, limit); diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 8a3bdda2750..076c19dc0a0 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -133,7 +133,7 @@ private: /** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`). - * During sorting only blocks with rows equal by `x` saved in RAM. + * During sorting only blocks with rows that equal by `x` saved in RAM. * */ class FinishMergeSortingBlockInputStream : public IProfilingBlockInputStream { diff --git a/dbms/src/Interpreters/sortBlock.cpp b/dbms/src/Interpreters/sortBlock.cpp index 9258857a0d4..40c98dd7cd5 100644 --- a/dbms/src/Interpreters/sortBlock.cpp +++ b/dbms/src/Interpreters/sortBlock.cpp @@ -44,6 +44,27 @@ ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, c } +struct PartialSortingLess +{ + const ColumnsWithSortDescriptions & columns; + + explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {} + + bool operator() (size_t a, size_t b) const + { + for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) + { + int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction); + if (res < 0) + return true; + else if (res > 0) + return false; + } + return false; + } +}; + + struct PartialSortingLessWithCollation { const ColumnsWithSortDescriptions & columns; diff --git a/dbms/src/Interpreters/sortBlock.h b/dbms/src/Interpreters/sortBlock.h index cfb36ed5e4b..d6bce9a715d 100644 --- a/dbms/src/Interpreters/sortBlock.h +++ b/dbms/src/Interpreters/sortBlock.h @@ -31,26 +31,6 @@ bool isAlreadySorted(const Block & block, const SortDescription & description); using ColumnsWithSortDescriptions = std::vector>; -struct PartialSortingLess -{ - const ColumnsWithSortDescriptions & columns; - - explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {} - - bool operator() (size_t a, size_t b) const - { - for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) - { - int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction); - if (res < 0) - return true; - else if (res > 0) - return false; - } - return false; - } -}; - ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description); }