diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 6122f54630d..28c8c6c2094 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace ProfileEvents @@ -304,4 +305,126 @@ void MergeSortingBlockInputStream::remerge() sum_bytes_in_blocks = new_sum_bytes_in_blocks; } + +FinishMergeSortingBlockInputStream::FinishMergeSortingBlockInputStream( + const BlockInputStreamPtr & input, SortDescription & description_sorted_, + SortDescription & description_to_sort_, + size_t max_merged_block_size_, size_t limit_) + : description_sorted(description_sorted_), description_to_sort(description_to_sort_), + max_merged_block_size(max_merged_block_size_), limit(limit_) +{ + children.push_back(input); + header = children.at(0)->getHeader(); + removeConstantsFromSortDescription(header, description_sorted); + removeConstantsFromSortDescription(header, description_to_sort); +} + +static bool equalKeysAt(const ColumnsWithSortDescriptions & lhs, const ColumnsWithSortDescriptions & rhs, size_t n, size_t m) +{ + + for (auto it = lhs.begin(), jt = rhs.begin(); it != lhs.end(); ++it, ++jt) + { + int res = it->first->compareAt(n, m, *jt->first, it->second.nulls_direction); + if (res != 0) + return false; + } + return true; +} + +Block FinishMergeSortingBlockInputStream::readImpl() +{ + if (limit && total_rows_processed == limit) + return {}; + + Block res; + if (impl) + res = impl->read(); + + /// If res block is empty, we finish sorting previous chunk of blocks. + if (!res) + { + if (end_of_stream) + return {}; + + blocks.clear(); + if (tail_block) + blocks.push_back(std::move(tail_block)); + + Block block; + size_t tail_pos = 0; + while (true) + { + block = children.back()->read(); + + /// End of input stream, but we can`t returns immediatly, we need to merge already read blocks. + /// Check it later, when get end of stream from impl. + if (!block) + { + end_of_stream = true; + break; + } + + // If there were only const columns in sort description, then there is no need to sort. + // Return the blocks as is. + if (description_to_sort.empty()) + return block; + + size_t size = block.rows(); + if (size == 0) + continue; + + auto columns_with_sort_desc = getColumnsWithSortDescription(block, description_sorted); + + removeConstantsFromBlock(block); + + /// May be new block starts with new key. + if (!blocks.empty()) + { + const Block & last_block = blocks.back(); + if (!equalKeysAt(getColumnsWithSortDescription(last_block, description_sorted), columns_with_sort_desc, last_block.rows() - 1, 0)) + break; + } + + IColumn::Permutation perm(size); + for (size_t i = 0; i < size; ++i) + perm[i] = i; + + PartialSortingLess less(columns_with_sort_desc); + + /// We need to save tail of block, because next block may starts with the same key as in tail + /// and we should sort these rows in one chunk. + tail_pos = *std::lower_bound(perm.begin(), perm.end(), size - 1, less); + + if (tail_pos != 0) + break; + + /// If we reach here, that means that current block has all rows with the same key as tail of a previous block. + blocks.push_back(block); + } + + if (block) + { + Block head_block = block.cloneEmpty(); + tail_block = block.cloneEmpty(); + for (size_t i = 0; i < block.columns(); ++i) + { + head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos); + tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos); + } + + blocks.push_back(head_block); + } + + impl = std::make_unique(blocks, description_to_sort, max_merged_block_size, limit); + res = impl->read(); + } + + if (res) + enrichBlockWithConstants(res, header); + + total_rows_processed += res.rows(); + + return res; +} + } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 9b7e1aa1139..8a3bdda2750 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -131,4 +131,45 @@ private: bool remerge_is_useful = true; }; + +/** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`). + * During sorting only blocks with rows equal by `x` saved in RAM. + * */ +class FinishMergeSortingBlockInputStream : public IProfilingBlockInputStream +{ +public: + /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. + FinishMergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_sorted_, + SortDescription & description_to_sort_, + size_t max_merged_block_size_, size_t limit_); + + String getName() const override { return "FinishMergeSorting"; } + + bool isSortedOutput() const override { return true; } + const SortDescription & getSortDescription() const override { return description_to_sort; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override; + +private: + SortDescription description_sorted; + SortDescription description_to_sort; + size_t max_merged_block_size; + size_t limit; + + Block tail_block; + Blocks blocks; + + std::unique_ptr impl; + + /// Before operation, will remove constant columns from blocks. And after, place constant columns back. + /// to avoid excessive virtual function calls + /// Save original block structure here. + Block header; + + bool end_of_stream = false; + size_t total_rows_processed = 0; +}; } diff --git a/dbms/src/Interpreters/sortBlock.cpp b/dbms/src/Interpreters/sortBlock.cpp index 3cf903f1cf1..9258857a0d4 100644 --- a/dbms/src/Interpreters/sortBlock.cpp +++ b/dbms/src/Interpreters/sortBlock.cpp @@ -13,9 +13,19 @@ namespace ErrorCodes } -using ColumnsWithSortDescriptions = std::vector>; +static inline bool needCollation(const IColumn * column, const SortColumnDescription & description) +{ + if (!description.collator) + return false; -static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description) + if (!typeid_cast(column)) /// TODO Nullable(String) + throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); + + return true; +} + + +ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description) { size_t size = description.size(); ColumnsWithSortDescriptions res; @@ -34,38 +44,6 @@ static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & b } -static inline bool needCollation(const IColumn * column, const SortColumnDescription & description) -{ - if (!description.collator) - return false; - - if (!typeid_cast(column)) /// TODO Nullable(String) - throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); - - return true; -} - - -struct PartialSortingLess -{ - const ColumnsWithSortDescriptions & columns; - - explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {} - - bool operator() (size_t a, size_t b) const - { - for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) - { - int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction); - if (res < 0) - return true; - else if (res > 0) - return false; - } - return false; - } -}; - struct PartialSortingLessWithCollation { const ColumnsWithSortDescriptions & columns; diff --git a/dbms/src/Interpreters/sortBlock.h b/dbms/src/Interpreters/sortBlock.h index 7cd4824a904..cfb36ed5e4b 100644 --- a/dbms/src/Interpreters/sortBlock.h +++ b/dbms/src/Interpreters/sortBlock.h @@ -29,4 +29,28 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti */ bool isAlreadySorted(const Block & block, const SortDescription & description); +using ColumnsWithSortDescriptions = std::vector>; + +struct PartialSortingLess +{ + const ColumnsWithSortDescriptions & columns; + + explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {} + + bool operator() (size_t a, size_t b) const + { + for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it) + { + int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction); + if (res < 0) + return true; + else if (res > 0) + return false; + } + return false; + } +}; + +ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description); + }