diff --git a/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h b/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h index 360e08500b7..74252f9d4dd 100644 --- a/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergeSortingBlockInputStream.h @@ -17,17 +17,17 @@ namespace DB { -/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток. - * Если данных для сортировки слишком много - может использовать внешнюю сортировку, с помощью временных файлов. +/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. + * If data to sort is too much, could use external sorting, with temporary files. */ -/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков. - * Возвращает результат слияния в виде потока блоков не более max_merged_block_size строк. +/** Part of implementation. Merging array of ready (already read from somewhere) blocks. + * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. */ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream { public: - /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. + /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_ = 0); @@ -52,8 +52,8 @@ private: std::priority_queue queue; std::priority_queue queue_with_collation; - /** Делаем поддержку двух разных курсоров - с Collation и без. - * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. + /** Two different cursors are supported - with and without Collation. + * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ template Block mergeImpl(std::priority_queue & queue); @@ -63,7 +63,7 @@ private: class MergeSortingBlockInputStream : public IProfilingBlockInputStream { public: - /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. + /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_) @@ -104,10 +104,15 @@ private: size_t sum_bytes_in_blocks = 0; std::unique_ptr impl; - /// Всё ниже - для внешней сортировки. + /// Before operation, will remove constant columns from blocks. And after, place constant columns back. + /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) + /// Save original block structure here. + Block sample_block; + + /// Everything below is for external sorting. std::vector> temporary_files; - /// Для чтения сброшенных во временный файл данных. + /// For reading data from temporary file. struct TemporaryFileStream { ReadBufferFromFile file_in; diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index c914cdeb3a1..bc5e5559842 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -10,26 +10,81 @@ namespace DB { +/** Remove constant columns from block. + */ +static void removeConstantsFromBlock(Block & block) +{ + size_t columns = block.columns(); + size_t i = 0; + while (i < columns) + { + if (block.unsafeGetByPosition(i).column->isConst()) + { + block.erase(i); + --columns; + } + else + ++i; + } +} + +static void removeConstantsFromSortDescription(const Block & sample_block, SortDescription & description) +{ + description.erase(std::remove_if(description.begin(), description.end(), + [&](const SortColumnDescription & elem) + { + if (!elem.column_name.empty()) + return sample_block.getByName(elem.column_name).column->isConst(); + else + return sample_block.getByPosition(elem.column_number).column->isConst(); + }), description.end()); +} + +/** Add into block, whose constant columns was removed by previous function, + * constant columns from sample_block (which must have structure as before removal of constants from block). + */ +static void enrichBlockWithConstants(Block & block, const Block & sample_block) +{ + size_t rows = block.rowsInFirstColumn(); + size_t columns = sample_block.columns(); + + for (size_t i = 0; i < columns; ++i) + { + const auto & col_type_name = sample_block.unsafeGetByPosition(i); + if (col_type_name.column->isConst()) + block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name}); + } +} + + Block MergeSortingBlockInputStream::readImpl() { - /** Алгоритм: - * - читать в оперативку блоки из источника; - * - когда их становится слишком много и если возможна внешняя сортировка - * - слить блоки вместе в сортированный поток и записать его во временный файл; - * - в конце, слить вместе все сортированные потоки из временных файлов, а также из накопившихся в оперативке блоков. + /** Algorithm: + * - read to memory blocks from source stream; + * - if too much of them and if external sorting is enabled, + * - merge all blocks to sorted stream and write it to temporary file; + * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. */ - /// Ещё не прочитали блоки. + /// If has not read source blocks. if (!impl) { while (Block block = children.back()->read()) { + if (!sample_block) + { + sample_block = block.cloneEmpty(); + removeConstantsFromSortDescription(sample_block, description); + } + + removeConstantsFromBlock(block); + blocks.push_back(block); sum_bytes_in_blocks += block.bytes(); - /** Если блоков стало слишком много и возможна внешняя сортировка, - * то сольём вместе те блоки, которые успели накопиться, и сбросим сортированный поток во временный (сжатый) файл. - * NOTE. Возможно - проверка наличия свободного места на жёстком диске. + /** If too much of them and if external sorting is enabled, + * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. + * NOTE. It's possible to check free space in filesystem. */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { @@ -42,7 +97,7 @@ Block MergeSortingBlockInputStream::readImpl() LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); - copyData(block_in, block_out, &is_cancelled); /// NOTE. Возможно, ограничение на потребление места на дисках. + copyData(block_in, block_out, &is_cancelled); /// NOTE. Possibly limit disk usage. LOG_INFO(log, "Done writing part of data into temporary file " + path); blocks.clear(); @@ -59,28 +114,31 @@ Block MergeSortingBlockInputStream::readImpl() } else { - /// Если были сброшены временные данные в файлы. + /// If there was temporary files. ProfileEvents::increment(ProfileEvents::ExternalSortMerge); LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge."); - /// Сформируем сортированные потоки для слияния. + /// Create sorted streams to merge. for (const auto & file : temporary_files) { temporary_inputs.emplace_back(std::make_unique(file->path())); inputs_to_merge.emplace_back(temporary_inputs.back()->block_in); } - /// Оставшиеся в оперативке блоки. + /// Rest of blocks in memory. if (!blocks.empty()) inputs_to_merge.emplace_back(std::make_shared(blocks, description, max_merged_block_size, limit)); - /// Будем сливать эти потоки. + /// Will merge that sorted streams. impl = std::make_unique(inputs_to_merge, description, max_merged_block_size, limit); } } - return impl->read(); + Block res = impl->read(); + if (res) + enrichBlockWithConstants(res, sample_block); + return res; } @@ -142,7 +200,7 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queuegetByPosition(i).name != merged_block.getByPosition(i).name || shared_block_ptr->getByPosition(i).type->getName() != merged_block.getByPosition(i).type->getName() || shared_block_ptr->getByPosition(i).column->getName() != merged_block.getByPosition(i).column->getName()) - throw Exception("Merging blocks has different names or types of columns", ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); + { + throw Exception("Merging blocks has different names or types of columns:\n" + + shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(), + ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); + } + } } for (size_t i = 0; i < num_columns; ++i) diff --git a/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.reference b/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.reference new file mode 100644 index 00000000000..c3b8e5c0844 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.reference @@ -0,0 +1,40 @@ +999990 Hello +999991 Hello +999992 Hello +999993 Hello +999994 Hello +999995 Hello +999996 Hello +999997 Hello +999998 Hello +999999 Hello +999990 Hello +999991 Hello +999992 Hello +999993 Hello +999994 Hello +999995 Hello +999996 Hello +999997 Hello +999998 Hello +999999 Hello +999990 Hello +999991 Hello +999992 Hello +999993 Hello +999994 Hello +999995 Hello +999996 Hello +999997 Hello +999998 Hello +999999 Hello +999990 Hello +999991 Hello +999992 Hello +999993 Hello +999994 Hello +999995 Hello +999996 Hello +999997 Hello +999998 Hello +999999 Hello diff --git a/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.sql b/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.sql new file mode 100644 index 00000000000..136161f597d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00352_external_sorting_and_constants.sql @@ -0,0 +1,4 @@ +SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000; +SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number, k LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000; +SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY k, number, k LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000; +SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number, k, number LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000;