diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index d65b4492de1..4dad3f4abf4 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -269,6 +269,7 @@ namespace ErrorCodes UNION_ALL_COLUMN_ALIAS_MISMATCH, CLIENT_OUTPUT_FORMAT_SPECIFIED, UNKNOWN_BLOCK_INFO_FIELD, + BAD_COLLATION, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index 5d16a6a1856..8f65a5d15ce 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -50,9 +50,9 @@ struct SortCursorImpl ConstColumnPlainPtrs all_columns; ConstColumnPlainPtrs sort_columns; SortDescription desc; - size_t sort_columns_size; - size_t pos; - size_t rows; + size_t sort_columns_size = 0; + size_t pos = 0; + size_t rows = 0; /** Порядок (что сравнивается), если сравниваемые столбцы равны. * Даёт возможность предпочитать строки из нужного курсора. @@ -65,12 +65,12 @@ struct SortCursorImpl NeedCollationFlags need_collation; /** Есть ли хотя бы один столбец с Collator. */ - bool has_collation; + bool has_collation = false; - SortCursorImpl() : sort_columns(0), pos(0), rows(0) {} + SortCursorImpl() {} SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) - : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()), has_collation(false) + : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) { reset(block); } diff --git a/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h b/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h index e188243c21b..2a322d5cc3f 100644 --- a/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include @@ -12,12 +14,52 @@ namespace DB /** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток. */ + +/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков. + * Возвращает результат слияния в виде потока блоков не более max_merged_block_size строк. + */ +class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream +{ +public: + /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. + MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_, + size_t max_merged_block_size_, size_t limit_ = 0); + + String getName() const override { return "MergeSortingBlocksBlockInputStream"; } + String getID() const override { return getName(); } + +protected: + Block readImpl() override; + +private: + Blocks & blocks; + SortDescription description; + size_t max_merged_block_size; + size_t limit; + size_t total_merged_rows = 0; + + using CursorImpls = std::vector; + CursorImpls cursors; + + bool has_collation = false; + + std::priority_queue queue; + std::priority_queue queue_with_collation; + + /** Делаем поддержку двух разных курсоров - с Collation и без. + * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. + */ + template + Block mergeImpl(std::priority_queue & queue); +}; + + class MergeSortingBlockInputStream : public IProfilingBlockInputStream { public: /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_, size_t limit_ = 0) - : description(description_), limit(limit_), has_been_read(false), log(&Logger::get("MergeSortingBlockInputStream")) + : description(description_), limit(limit_) { children.push_back(input_); } @@ -43,22 +85,10 @@ private: SortDescription description; size_t limit; - /// Всё было прочитано. - bool has_been_read; + Logger * log = &Logger::get("MergeSortingBlockInputStream"); - Logger * log; - - /** Слить сразу много блоков с помощью priority queue. - */ - Block merge(Blocks & blocks); - - typedef std::vector CursorImpls; - - /** Делаем поддержку двух разных курсоров - с Collation и без. - * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. - */ - template - Block mergeImpl(Blocks & block, CursorImpls & cursors); + Blocks blocks; + std::unique_ptr impl; }; } diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 27dd479dfb8..37a9d571bb7 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -20,8 +20,8 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream public: /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0) - : description(description_), max_block_size(max_block_size_), limit(limit_), total_merged_rows(0), first(true), has_collation(false), - num_columns(0), source_blocks(inputs_.size()), cursors(inputs_.size()), log(&Logger::get("MergingSortedBlockInputStream")) + : description(description_), max_block_size(max_block_size_), limit(limit_), + source_blocks(inputs_.size()), cursors(inputs_.size()) { children.insert(children.end(), inputs_.begin(), inputs_.end()); } @@ -65,14 +65,13 @@ protected: SortDescription description; size_t max_block_size; size_t limit; - size_t total_merged_rows; + size_t total_merged_rows = 0; - bool first; - - bool has_collation; + bool first = true; + bool has_collation = false; /// Текущие сливаемые блоки. - size_t num_columns; + size_t num_columns = 0; Blocks source_blocks; typedef std::vector CursorImpls; @@ -139,7 +138,7 @@ private: template void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue & queue); - Logger * log; + Logger * log = &Logger::get("MergingSortedBlockInputStream"); }; } diff --git a/dbms/include/DB/DataStreams/OneBlockInputStream.h b/dbms/include/DB/DataStreams/OneBlockInputStream.h index b75ef5f898b..18f05415f12 100644 --- a/dbms/include/DB/DataStreams/OneBlockInputStream.h +++ b/dbms/include/DB/DataStreams/OneBlockInputStream.h @@ -16,7 +16,7 @@ using Poco::SharedPtr; class OneBlockInputStream : public IProfilingBlockInputStream { public: - OneBlockInputStream(const Block & block_) : block(block_), has_been_read(false) {} + OneBlockInputStream(const Block & block_) : block(block_) {} String getName() const override { return "OneBlockInputStream"; } @@ -39,7 +39,7 @@ protected: private: Block block; - bool has_been_read; + bool has_been_read = false; }; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index f56f528eb41..c45c6906ab8 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -1,14 +1,10 @@ -#include -#include - -#include - #include namespace DB { + Block MergeSortingBlockInputStream::readImpl() { /** Достаточно простой алгоритм: @@ -16,98 +12,83 @@ Block MergeSortingBlockInputStream::readImpl() * - объединить их всех; */ - if (has_been_read) - return Block(); + /// Ещё не прочитали блоки. + if (!impl) + { + while (Block block = children.back()->read()) + blocks.push_back(block); - has_been_read = true; + if (blocks.empty() || isCancelled()) + return Block(); - Blocks blocks; - while (Block block = children.back()->read()) - blocks.push_back(block); + impl.reset(new MergeSortingBlocksBlockInputStream(blocks, description, DEFAULT_BLOCK_SIZE, limit)); + } - if (isCancelled()) - return Block(); - - return merge(blocks); + return impl->read(); } -Block MergeSortingBlockInputStream::merge(Blocks & blocks) + +MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( + Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_) + : blocks(blocks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) +{ + Blocks nonempty_blocks; + for (const auto & block : blocks) + { + if (block.rowsInFirstColumn() == 0) + continue; + + nonempty_blocks.push_back(block); + cursors.emplace_back(block, description); + has_collation |= cursors.back().has_collation; + } + + blocks.swap(nonempty_blocks); + + if (!has_collation) + { + for (size_t i = 0; i < cursors.size(); ++i) + queue.push(SortCursor(&cursors[i])); + } + else + { + for (size_t i = 0; i < cursors.size(); ++i) + queue_with_collation.push(SortCursorWithCollation(&cursors[i])); + } +} + + +Block MergeSortingBlocksBlockInputStream::readImpl() { if (blocks.empty()) return Block(); if (blocks.size() == 1) - return blocks[0]; - - Stopwatch watch; - - LOG_DEBUG(log, "Merge sorting"); - - CursorImpls cursors(blocks.size()); - - bool has_collation = false; - - size_t nonempty_blocks = 0; - for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it) { - if (it->rowsInFirstColumn() == 0) - continue; - - cursors[nonempty_blocks] = SortCursorImpl(*it, description); - has_collation |= cursors[nonempty_blocks].has_collation; - - ++nonempty_blocks; + Block res = blocks[0]; + blocks.clear(); + return res; } - if (nonempty_blocks == 0) - return Block(); - - cursors.resize(nonempty_blocks); - - Block merged; - - if (has_collation) - merged = mergeImpl(blocks, cursors); - else - merged = mergeImpl(blocks, cursors); - - watch.stop(); - - size_t rows_before_merge = 0; - size_t bytes_before_merge = 0; - for (const auto & block : blocks) - { - rows_before_merge += block.rowsInFirstColumn(); - bytes_before_merge += block.bytes(); - } - - LOG_DEBUG(log, std::fixed << std::setprecision(2) - << "Merge sorted " << blocks.size() << " blocks, from " << rows_before_merge << " to " << merged.rows() << " rows" - << " in " << watch.elapsedSeconds() << " sec., " - << rows_before_merge / watch.elapsedSeconds() << " rows/sec., " - << bytes_before_merge / 1048576.0 / watch.elapsedSeconds() << " MiB/sec."); - - return merged; + return !has_collation + ? mergeImpl(queue) + : mergeImpl(queue_with_collation); } + template -Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cursors) +Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) { Block merged = blocks[0].cloneEmpty(); size_t num_columns = blocks[0].columns(); - typedef std::priority_queue Queue; - Queue queue; - - for (size_t i = 0; i < cursors.size(); ++i) - queue.push(TSortCursor(&cursors[i])); - ColumnPlainPtrs merged_columns; for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve - merged_columns.push_back(&*merged.getByPosition(i).column); + merged_columns.push_back(merged.getByPosition(i).column.get()); /// Вынимаем строки в нужном порядке и кладём в merged. - for (size_t row = 0; (!limit || row < limit) && !queue.empty(); ++row) + size_t merged_rows = 0; + while (!queue.empty()) { TSortCursor current = queue.top(); queue.pop(); @@ -120,9 +101,24 @@ Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cur current->next(); queue.push(current); } + + ++total_merged_rows; + if (limit && total_merged_rows == limit) + { + blocks.clear(); + return merged; + } + + ++merged_rows; + if (merged_rows == max_merged_block_size) + return merged; } + if (merged_rows == 0) + merged.clear(); + return merged; } + } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index e0dc6af2e14..85b8821c6f8 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -852,7 +852,9 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams) ++it) { String name = (*it)->children.front()->getColumnName(); - order_descr.push_back(SortColumnDescription(name, typeid_cast(**it).direction)); + const ASTOrderByElement & order_by_elem = typeid_cast(**it); + + order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.collator); } /// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку. diff --git a/dbms/src/Interpreters/sortBlock.cpp b/dbms/src/Interpreters/sortBlock.cpp index acb18f90d1e..f1b7405bd4f 100644 --- a/dbms/src/Interpreters/sortBlock.cpp +++ b/dbms/src/Interpreters/sortBlock.cpp @@ -9,7 +9,13 @@ typedef std::vector > ColumnsW static inline bool needCollation(const IColumn * column, const SortColumnDescription & description) { - return !description.collator.isNull() && column->getName() == "ColumnString"; + if (description.collator.isNull()) + return false; + + if (column->getName() != "ColumnString") + throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION); + + return true; } diff --git a/dbms/tests/queries/0_stateless/00105_collations.reference b/dbms/tests/queries/0_stateless/00105_collations.reference new file mode 100644 index 00000000000..3ff09ff2f2d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00105_collations.reference @@ -0,0 +1,250 @@ +Ё +А +Я +а +я +ё +а +А +ё +Ё +я +Я +а +а +А +А +ё +ё +Ё +Ё +я +я +Я +Я +A +A +B +B +C +C +D +D +E +E +F +F +G +G +H +H +I +I +J +J +K +K +L +L +M +M +N +N +O +O +P +P +Q +R +R +S +S +T +T +U +U +V +V +W +X +Y +Y +Z +Z +a +a +b +b +c +c +d +d +e +e +f +f +g +g +h +h +i +i +j +j +k +k +l +l +m +m +n +n +o +o +p +p +q +r +r +s +s +t +t +u +u +v +v +w +x +y +y +z +z +Ç +Ö +Ü +ç +ö +ü +Ğ +ğ +İ +ı +Ş +ş +a +a +A +A +b +b +B +B +c +c +C +C +ç +Ç +d +d +D +D +e +e +E +E +f +f +F +F +g +g +G +G +ğ +Ğ +h +h +H +H +ı +I +I +i +i +İ +j +j +J +J +k +k +K +K +l +l +L +L +m +m +M +M +n +n +N +N +o +o +O +O +ö +Ö +p +p +P +P +q +Q +r +r +R +R +s +s +S +S +ş +Ş +t +t +T +T +u +u +U +U +ü +Ü +v +v +V +V +w +W +x +X +y +y +Y +Y +z +z +Z +Z +а 1 +А 4 +ё 3 +Ё 6 +я 2 +Я 5 diff --git a/dbms/tests/queries/0_stateless/00105_collations.sql b/dbms/tests/queries/0_stateless/00105_collations.sql new file mode 100644 index 00000000000..beb55d90d89 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00105_collations.sql @@ -0,0 +1,6 @@ +SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x; +SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x ORDER BY x COLLATE 'ru'; +SELECT arrayJoin(['а', 'я', 'ё', 'А', 'Я', 'Ё']) AS x FROM remote('127.0.0.{1,2}', system, one) ORDER BY x COLLATE 'ru'; +SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x; +SELECT arrayJoin(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'ç', 'd', 'e', 'f', 'g', 'ğ', 'h', 'ı', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'ö', 'p', 'r', 's', 'ş', 't', 'u', 'ü', 'v', 'y', 'z', 'A', 'B', 'C', 'Ç', 'D', 'E', 'F', 'G', 'Ğ', 'H', 'I', 'İ', 'J', 'K', 'L', 'M', 'N', 'O', 'Ö', 'P', 'R', 'S', 'Ş', 'T', 'U', 'Ü', 'V', 'Y', 'Z']) AS x ORDER BY x COLLATE 'tr'; +SELECT x, n FROM (SELECT ['а', 'я', 'ё', 'А', 'Я', 'Ё'] AS arr) ARRAY JOIN arr AS x, arrayEnumerate(arr) AS n ORDER BY x COLLATE 'ru', n;