From 2f98e779e18aab5cc096d013f2d0e4d76a52a34b Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 26 Feb 2019 21:40:08 +0300 Subject: [PATCH] Added MergingSortedTransform. --- dbms/src/Core/SortCursor.h | 33 +- dbms/src/DataStreams/processConstants.cpp | 13 +- dbms/src/Processors/Chunk.cpp | 18 + dbms/src/Processors/Chunk.h | 4 + .../src/Processors/IAccumulatingTransform.cpp | 9 + dbms/src/Processors/IAccumulatingTransform.h | 3 + .../Transforms/MergeSortingTransform.cpp | 331 ++++++++++++++++++ .../Transforms/MergeSortingTransform.h | 82 +++++ .../Transforms/MergingSortedTransform.cpp | 302 ++++++++++++++++ .../Transforms/MergingSortedTransform.h | 250 +++++++++++++ .../Transforms/PartialSortingTransform.cpp | 22 ++ .../Transforms/PartialSortingTransform.h | 27 ++ 12 files changed, 1081 insertions(+), 13 deletions(-) create mode 100644 dbms/src/Processors/Transforms/MergeSortingTransform.cpp create mode 100644 dbms/src/Processors/Transforms/MergeSortingTransform.h create mode 100644 dbms/src/Processors/Transforms/MergingSortedTransform.cpp create mode 100644 dbms/src/Processors/Transforms/MergingSortedTransform.h create mode 100644 dbms/src/Processors/Transforms/PartialSortingTransform.cpp create mode 100644 dbms/src/Processors/Transforms/PartialSortingTransform.h diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index cd427fe8ab1..f544fce6c1c 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -47,26 +48,44 @@ struct SortCursorImpl reset(block); } + SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0) + : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) + { + for (auto & column_desc : desc) + { + if (!column_desc.column_name.empty()) + throw Exception("SortDesctiption should contain column position if SortCursor was used without header.", + ErrorCodes::LOGICAL_ERROR); + } + reset(columns, {}); + } + bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. void reset(const Block & block) + { + reset(block.getColumns(), block); + } + + /// Set the cursor to the beginning of the new block. + void reset(const Columns & columns, const Block & block) { all_columns.clear(); sort_columns.clear(); - size_t num_columns = block.columns(); + size_t num_columns = columns.size(); for (size_t j = 0; j < num_columns; ++j) - all_columns.push_back(block.safeGetByPosition(j).column.get()); + all_columns.push_back(columns[j].get()); for (size_t j = 0, size = desc.size(); j < size; ++j) { - size_t column_number = !desc[j].column_name.empty() - ? block.getPositionByName(desc[j].column_name) - : desc[j].column_number; - - sort_columns.push_back(block.safeGetByPosition(column_number).column.get()); + auto & column_desc = desc[j]; + size_t column_number = !column_desc.column_name.empty() + ? block.getPositionByName(column_desc.column_name) + : column_desc.column_number; + sort_columns.push_back(columns[column_number].get()); need_collation[j] = desc[j].collator != nullptr && typeid_cast(sort_columns.back()); /// TODO Nullable(String) has_collation |= need_collation[j]; diff --git a/dbms/src/DataStreams/processConstants.cpp b/dbms/src/DataStreams/processConstants.cpp index fff10afe780..31c81abd60a 100644 --- a/dbms/src/DataStreams/processConstants.cpp +++ b/dbms/src/DataStreams/processConstants.cpp @@ -9,7 +9,7 @@ void removeConstantsFromBlock(Block & block) size_t i = 0; while (i < columns) { - if (block.getByPosition(i).column->isColumnConst()) + if (block.getByPosition(i).column && block.getByPosition(i).column->isColumnConst()) { block.erase(i); --columns; @@ -22,13 +22,14 @@ void removeConstantsFromBlock(Block & block) void removeConstantsFromSortDescription(const Block & header, SortDescription & description) { + /// Note: This code is not correct if column description contains column numbers instead of column names. + /// Hopefully, everywhere where it is used, column description contains names. description.erase(std::remove_if(description.begin(), description.end(), [&](const SortColumnDescription & elem) { - if (!elem.column_name.empty()) - return header.getByName(elem.column_name).column->isColumnConst(); - else - return header.safeGetByPosition(elem.column_number).column->isColumnConst(); + auto & column = !elem.column_name.empty() ? header.getByName(elem.column_name) + : header.safeGetByPosition(elem.column_number); + return column.column && column.column->isColumnConst(); }), description.end()); } @@ -41,7 +42,7 @@ void enrichBlockWithConstants(Block & block, const Block & header) for (size_t i = 0; i < columns; ++i) { const auto & col_type_name = header.getByPosition(i); - if (col_type_name.column->isColumnConst()) + if (col_type_name.column && col_type_name.column->isColumnConst()) block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name}); } } diff --git a/dbms/src/Processors/Chunk.cpp b/dbms/src/Processors/Chunk.cpp index d4726dfbd3b..e0beea48d21 100644 --- a/dbms/src/Processors/Chunk.cpp +++ b/dbms/src/Processors/Chunk.cpp @@ -92,6 +92,15 @@ MutableColumns Chunk::mutateColumns() return mut_columns; } +MutableColumns Chunk::cloneEmptyColumns() const +{ + size_t num_columns = columns.size(); + MutableColumns mut_columns(num_columns); + for (size_t i = 0; i < num_columns; ++i) + mut_columns[i] = columns[i]->cloneEmpty(); + return mut_columns; +} + Columns Chunk::detachColumns() { num_rows = 0; @@ -117,6 +126,15 @@ void Chunk::erase(size_t position) columns.erase(columns.begin() + position); } +UInt64 Chunk::allocatedBytes() const +{ + UInt64 res = 0; + for (const auto & column : columns) + res += column->allocatedBytes(); + + return res; +} + void ChunkMissingValues::setBit(size_t column_idx, size_t row_idx) { diff --git a/dbms/src/Processors/Chunk.h b/dbms/src/Processors/Chunk.h index b13e30a9483..523e2a9a3db 100644 --- a/dbms/src/Processors/Chunk.h +++ b/dbms/src/Processors/Chunk.h @@ -32,6 +32,8 @@ public: void setColumns(MutableColumns columns_, UInt64 num_rows_); Columns detachColumns(); MutableColumns mutateColumns(); + /** Get empty columns with the same types as in block. */ + MutableColumns cloneEmptyColumns() const; const ChunkInfoPtr & getChunkInfo() const { return chunk_info; } void setChunkInfo(ChunkInfoPtr chunk_info_) { chunk_info = std::move(chunk_info_); } @@ -46,6 +48,8 @@ public: void clear(); void erase(size_t position); + UInt64 allocatedBytes() const; + private: Columns columns; UInt64 num_rows = 0; diff --git a/dbms/src/Processors/IAccumulatingTransform.cpp b/dbms/src/Processors/IAccumulatingTransform.cpp index c4998562cf2..6f021b8afe6 100644 --- a/dbms/src/Processors/IAccumulatingTransform.cpp +++ b/dbms/src/Processors/IAccumulatingTransform.cpp @@ -71,5 +71,14 @@ void IAccumulatingTransform::work() } } +void IAccumulatingTransform::setReadyChunk(Chunk chunk) +{ + if (current_output_chunk) + throw Exception("IAccumulatingTransform already has input. Cannot set another chunk. " + "Probably, setReadyChunk method was called twose per consume().", ErrorCodes::LOGICAL_ERROR); + + current_output_chunk = std::move(chunk); +} + } diff --git a/dbms/src/Processors/IAccumulatingTransform.h b/dbms/src/Processors/IAccumulatingTransform.h index c8486ed4ce9..02b3bc36a30 100644 --- a/dbms/src/Processors/IAccumulatingTransform.h +++ b/dbms/src/Processors/IAccumulatingTransform.h @@ -25,6 +25,9 @@ protected: virtual void consume(Chunk chunk) = 0; virtual Chunk generate() = 0; + /// This method can be called once per consume call. In case if some chunks are ready. + void setReadyChunk(Chunk chunk); + public: IAccumulatingTransform(Block input_header, Block output_header); diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp new file mode 100644 index 00000000000..b5ef315abec --- /dev/null +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp @@ -0,0 +1,331 @@ +#include + +#include +#include + +#include +#include + +#include +#include + +#include +#include + +#include + + +namespace ProfileEvents +{ + extern const Event ExternalSortWritePart; + extern const Event ExternalSortMerge; +} + + +namespace DB +{ + +/** Part of implementation. Merging array of ready (already read from somewhere) chunks. + * Returns result of merge as stream of chunks, not more than 'max_merged_block_size' rows in each. + */ +class MergeSorter +{ +public: + MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_ = 0); + + Chunk read(); + +private: + Chunks & chunks; + SortDescription description; + size_t max_merged_block_size; + UInt64 limit; + size_t total_merged_rows = 0; + + using CursorImpls = std::vector; + CursorImpls cursors; + + bool has_collation = false; + + std::priority_queue queue_without_collation; + std::priority_queue queue_with_collation; + + /** Two different cursors are supported - with and without Collation. + * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. + */ + template + Chunk mergeImpl(std::priority_queue & queue); +}; + +MergeSorter::MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_) + : chunks(chunks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) +{ + Chunks nonempty_chunks; + for (auto & chunk : chunks) + { + if (chunk.getNumRows() == 0) + continue; + + cursors.emplace_back(chunk.getColumns(), description); + has_collation |= cursors.back().has_collation; + + nonempty_chunks.emplace_back(std::move(chunk)); + } + + chunks.swap(nonempty_chunks); + + if (!has_collation) + { + for (auto & cursor : cursors) + queue_without_collation.push(SortCursor(&cursor)); + } + else + { + for (auto & cursor : cursors) + queue_with_collation.push(SortCursorWithCollation(&cursor)); + } +} + + +Chunk MergeSorter::read() +{ + if (chunks.empty()) + return Chunk(); + + if (chunks.size() == 1) + { + auto res = std::move(chunks[0]); + chunks.clear(); + return res; + } + + return !has_collation + ? mergeImpl(queue_without_collation) + : mergeImpl(queue_with_collation); +} + + +template +Chunk MergeSorter::mergeImpl(std::priority_queue & queue) +{ + size_t num_columns = chunks[0].getNumColumns(); + + MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); + /// TODO: reserve (in each column) + + /// Take rows from queue in right order and push to 'merged'. + size_t merged_rows = 0; + while (!queue.empty()) + { + TSortCursor current = queue.top(); + queue.pop(); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + + ++total_merged_rows; + ++merged_rows; + + if (!current->isLast()) + { + current->next(); + queue.push(current); + } + + if (limit && total_merged_rows == limit) + { + chunks.clear(); + return Chunk(std::move(merged_columns), merged_rows); + } + + if (merged_rows == max_merged_block_size) + return Chunk(std::move(merged_columns), merged_rows); + } + + if (merged_rows == 0) + return {}; + + return Chunk(std::move(merged_columns), merged_rows); +} + + +MergeSortingTransform::MergeSortingTransform( + const Block & header, + SortDescription & description_, + size_t max_merged_block_size_, UInt64 limit_, + size_t max_bytes_before_remerge_, + size_t max_bytes_before_external_sort_, const std::string & tmp_path_) + : IAccumulatingTransform(header, header) + , description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) + , max_bytes_before_remerge(max_bytes_before_remerge_) + , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) +{ + auto & sample = getInputPort().getHeader(); + + /// Replace column names to column position in sort_description. + for (auto & column_description : description) + { + if (!column_description.column_name.empty()) + { + column_description.column_number = sample.getPositionByName(column_description.column_name); + column_description.column_name.clear(); + } + } + + /// Remove constants from header and map old indexes to new. + size_t num_columns = sample.columns(); + ColumnNumbers map(num_columns, num_columns); + for (size_t pos = 0; pos < num_columns; ++pos) + { + const auto & column = sample.getByPosition(pos); + if (!(column.column && column.column->isColumnConst())) + { + map[pos] = header_without_constants.columns(); + header_without_constants.insert(column); + } + } + + /// Remove constants from column_description and remap positions. + SortDescription description_without_constants; + description_without_constants.reserve(description.size()); + for (const auto & column_description : description) + { + auto old_pos = column_description.column_number; + auto new_pos = map[old_pos]; + if (new_pos < num_columns) + { + description_without_constants.push_back(column_description); + description_without_constants.back().column_number = new_pos; + } + } + + description.swap(description_without_constants); +} + +void MergeSortingTransform::consume(Chunk chunk) +{ + /** Algorithm: + * - read to memory blocks from source stream; + * - if too many of them and if external sorting is enabled, + * - merge all blocks to sorted stream and write it to temporary file; + * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. + */ + + /// If there were only const columns in sort description, then there is no need to sort. + /// Return the chunk as is. + if (description.empty()) + { + setReadyChunk(std::move(chunk)); + return; + } + + removeConstColumns(chunk); + + sum_rows_in_blocks += chunk.getNumRows(); + sum_bytes_in_blocks += chunk.allocatedBytes(); + chunks.push_back(std::move(chunk)); + + /** If significant amount of data was accumulated, perform preliminary merging step. + */ + if (chunks.size() > 1 + && limit + && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. + && remerge_is_useful + && max_bytes_before_remerge + && sum_bytes_in_blocks > max_bytes_before_remerge) + { + remerge(); + } + + /** If too many of them and if external sorting is enabled, + * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. + * NOTE. It's possible to check free space in filesystem. + */ + if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) + { + Poco::File(tmp_path).createDirectories(); + temporary_files.emplace_back(std::make_unique(tmp_path)); + const std::string & path = temporary_files.back()->path(); + WriteBufferFromFile file_buf(path); + CompressedWriteBuffer compressed_buf(file_buf); + NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants); + MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit); + + LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); + ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); + + /// NOTE. Possibly limit disk usage. + /// NOTE. This should be another one processor. + /// TODO: Rewrite this code when processors could be able to create another processors. + block_out.writePrefix(); + while (auto next = merge_sorter.read()) + { + auto block = header_without_constants.cloneWithColumns(next.detachColumns()); + block_out.write(block); + } + block_out.writeSuffix(); + + LOG_INFO(log, "Done writing part of data into temporary file " + path); + + chunks.clear(); + sum_bytes_in_blocks = 0; + sum_rows_in_blocks = 0; + } +} + +void MergeSortingTransform::remerge() +{ + LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption"); + + /// NOTE Maybe concat all blocks and partial sort will be faster than merge? + MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit); + + Chunks new_chunks; + size_t new_sum_rows_in_blocks = 0; + size_t new_sum_bytes_in_blocks = 0; + + while (auto chunk = merge_sorter.read()) + { + new_sum_rows_in_blocks += chunk.getNumRows(); + new_sum_bytes_in_blocks += chunk.allocatedBytes(); + new_chunks.emplace_back(chunk); + } + + LOG_DEBUG(log, "Memory usage is lowered from " + << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " + << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks)); + + /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess. + if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks) + remerge_is_useful = false; + + chunks = std::move(new_chunks); + sum_rows_in_blocks = new_sum_rows_in_blocks; + sum_bytes_in_blocks = new_sum_bytes_in_blocks; +} + + +void MergeSortingTransform::removeConstColumns(Chunk & chunk) +{ + size_t num_columns = chunk.getNumColumns(); + size_t num_rows = chunk.getNumRows(); + + if (num_columns != const_columns_to_remove.size()) + throw Exception("Block has different number of columns with header: " + toString(num_columns) + + " vs " + toString(const_columns_to_remove.size()), ErrorCodes::LOGICAL_ERROR); + + auto columns = chunk.detachColumns(); + Columns column_without_constants; + column_without_constants.reserve(header_without_constants.columns()); + + for (size_t position = 0; position < num_columns; ++position) + { + if (!const_columns_to_remove[position]) + column_without_constants.push_back(std::move(columns[position])); + } + + chunk.setColumns(std::move(column_without_constants), num_rows); +} + + +} diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.h b/dbms/src/Processors/Transforms/MergeSortingTransform.h new file mode 100644 index 00000000000..1e960deeada --- /dev/null +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.h @@ -0,0 +1,82 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace DB +{ + +class MergeSortingTransform : public IAccumulatingTransform +{ +public: + /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. + MergeSortingTransform(const Block & header, + SortDescription & description_, + size_t max_merged_block_size_, UInt64 limit_, + size_t max_bytes_before_remerge_, + size_t max_bytes_before_external_sort_, const std::string & tmp_path_); + + String getName() const override { return "MergeSortingTransform"; } + +protected: + void consume(Chunk chunk) override; + Chunk generate() override; + +private: + SortDescription description; + size_t max_merged_block_size; + UInt64 limit; + + size_t max_bytes_before_remerge; + size_t max_bytes_before_external_sort; + const std::string tmp_path; + + Logger * log = &Logger::get("MergeSortingBlockInputStream"); + + Chunks chunks; + size_t sum_rows_in_blocks = 0; + size_t sum_bytes_in_blocks = 0; + + /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. + bool remerge_is_useful = true; + + /// Before operation, will remove constant columns from blocks. And after, place constant columns back. + /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) + /// Save original block structure here. + Block header_without_constants; + /// Columns which were constant in header and we need to remove from chunks. + std::vector const_columns_to_remove; + + /// Everything below is for external sorting. + std::vector> temporary_files; + + /// For reading data from temporary file. + struct TemporaryFileStream + { + ReadBufferFromFile file_in; + CompressedReadBuffer compressed_in; + BlockInputStreamPtr block_in; + + TemporaryFileStream(const std::string & path, const Block & header) + : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, header, 0)) {} + }; + + std::vector> temporary_inputs; + + BlockInputStreams inputs_to_merge; + + /// Merge all accumulated blocks to keep no more than limit rows. + void remerge(); + + void removeConstColumns(Chunk & chunk); +}; + +} diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp new file mode 100644 index 00000000000..b459e74172a --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -0,0 +1,302 @@ +#include +#include +#include + +namespace DB +{ + +MergingSortedTransform::MergingSortedTransform( + const Block & header, + size_t num_inputs, + const SortDescription & description_, + size_t max_block_size, + UInt64 limit, + bool quiet) + : IProcessor(InputPorts(num_inputs, header), {header}) + , description(description_), max_block_size(max_block_size), limit(limit), quiet(quiet) + , merged_data(header), source_chunks(num_inputs), cursors(num_inputs) +{ + auto & sample = outputs[0].getHeader(); + /// Replace column names in description to positions. + for (auto & column_description : description) + { + has_collation |= column_description.collator != nullptr; + if (!column_description.column_name.empty()) + { + column_description.column_number = sample.getPositionByName(column_description.column_name); + column_description.column_name.clear(); + } + } +} + +IProcessor::Status MergingSortedTransform::prepare() +{ + auto & output = outputs[0]; + + /// Special case for no inputs. + if (inputs.empty()) + { + output.finish(); + return Status::Finished; + } + + /// Check can output. + + if (output.isFinished()) + { + for (auto & in : inputs) + in.close(); + + return Status::Finished; + } + + if (!output.isNeeded()) + { + for (auto & in : inputs) + in.setNotNeeded(); + + return Status::PortFull; + } + + if (output.hasData()) + return Status::PortFull; + + /// Special case for single input. + if (inputs.size() == 1) + { + auto & input = inputs[0]; + if (input.isFinished()) + { + output.finish(); + return Status::Finished; + } + + input.setNeeded(); + if (input.hasData()) + output.push(input.pull()); + + return Status::NeedData; + } + + /// Push if has data. + if (merged_data.mergedRows()) + { + output.push(merged_data.pull()); + return Status::PortFull; + } + + if (!is_initialized) + { + /// Check for inputs we need. + bool all_inputs_has_data = true; + for (size_t i = 0; i < inputs.size(); ++i) + { + if (inputs[i].isFinished()) + continue; + + if (!cursors[i].empty()) + { + inputs[i].setNotNeeded(); + continue; + } + + inputs[i].setNeeded(); + + if (!inputs[i].hasData()) + { + all_inputs_has_data = false; + continue; + } + + auto chunk = inputs[i].pull(); + if (chunk.hasNoRows()) + { + all_inputs_has_data = false; + continue; + } + + updateCursor(std::move(chunk), i); + } + + if (!all_inputs_has_data) + return Status::NeedData; + + if (has_collation) + initQueue(queue_with_collation); + else + initQueue(queue_without_collation); + + is_initialized = true; + return Status::Ready; + } + else + { + if (is_finished) + { + for (auto & input : inputs) + input.close(); + + outputs[0].finish(); + + return Status::Finished; + } + + if (need_data) + { + auto & input = inputs[next_input_to_read]; + if (!input.isFinished()) + { + input.setNeeded(); + + if (!input.hasData()) + return Status::NeedData; + + updateCursor(input.pull(), next_input_to_read); + pushToQueue(next_input_to_read); + need_data = false; + } + } + + return Status::Ready; + } +} + +void MergingSortedTransform::work() +{ + if (has_collation) + merge(queue_with_collation); + else + merge(queue_without_collation); +} + +template +void MergingSortedTransform::merge(std::priority_queue & queue) +{ + /// Returns MergeStatus which we should return if we are going to finish now. + auto can_read_another_row = [&, this]() + { + if (limit && merged_data.totalMergedRows() >= limit) + { + //std::cerr << "Limit reached\n"; + is_finished = true; + return false; + } + + if (merged_data.totalMergedRows() >= max_block_size) + { + //std::cerr << "max_block_size reached\n"; + return false; + } + + return true; + }; + + /// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size` + while (!queue.empty()) + { + /// Shouldn't happen at first iteration, but check just in case. + if (!can_read_another_row()) + return; + + TSortCursor current = queue.top(); + queue.pop(); + bool first_iteration = true; + + while (true) + { + if (!first_iteration && !can_read_another_row()) + { + queue.push(current); + return; + } + first_iteration = false; + + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) + { + //std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_data.mergedRows() != 0) + { + //std::cerr << "merged rows is non-zero\n"; + queue.push(current); + return; + } + + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = current.impl->order; + insertFromChunk(source_num); + return; + } + + //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; + //std::cerr << "Inserting row\n"; + merged_data.insertRow(current->all_columns, current->pos); + + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current.impl->order); + out_row_sources_buf->write(row_source.data); + } + + if (current->isLast()) + { + need_data = true; + next_input_to_read = current.impl->order; + + if (merged_data.totalMergedRows() >= limit) + is_finished = true; + + return; + } + + //std::cerr << "moving to next row\n"; + current->next(); + + if (!queue.empty() && current.greater(queue.top())) + { + //std::cerr << "next row is not least, pushing back to queue\n"; + queue.push(current); + break; + } + } + } +} + +void MergingSortedTransform::insertFromChunk(size_t source_num) +{ + if (source_num >= cursors.size()) + throw Exception("Logical error in MergingSortedTrandform", ErrorCodes::LOGICAL_ERROR); + + //std::cerr << "copied columns\n"; + + auto num_rows = source_chunks[source_num]->getNumRows(); + + UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows; + if (limit && total_merged_rows_after_insertion > limit) + { + num_rows = total_merged_rows_after_insertion - limit; + merged_data.insertFromChunk(std::move(*source_chunks[source_num]), num_rows); + is_finished = true; + } + else + { + merged_data.insertFromChunk(std::move(*source_chunks[source_num]), 0); + need_data = true; + next_input_to_read = source_num; + } + + if (out_row_sources_buf) + { + RowSourcePart row_source(source_num); + for (size_t i = 0; i < num_rows; ++i) + out_row_sources_buf->write(row_source.data); + } +} + + +} diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h new file mode 100644 index 00000000000..9eb86252314 --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -0,0 +1,250 @@ +#pragma once +#include +#include +#include + +#include + +namespace DB +{ + +/// Allows you refer to the row in the block and hold the block ownership, +/// and thus avoid creating a temporary row object. +/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`; +/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece; +/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr; +/// The reference counter is not atomic, since it is used from one thread. +namespace detail +{ +struct SharedChunk : Chunk +{ + int refcount = 0; + + ColumnRawPtrs all_columns; + ColumnRawPtrs sort_columns; + + SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {} +}; + +//template +//class Queue +//{ +//public: +// bool empty() const { return queue.empty(); } +// void push(TSortCursor cursor) { queue.push(std::move(cursor)); } +// +// bool needUpdateCursor() const { return !empty() && !queue.top()->isLast(); } +// +// void updateCursor(TSortCursor cursor) +// { +// if (!needUpdateCursor()) +// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR); +// +// if (cursor->order != queue.top()->order) +// throw Exception("Cannot update cursor for sort cursor queue because top cursor order " +// "(" + toString(queue.top()->order) + ") is not equal to new cursor order " +// "(" + toString(cursor->order) + ").", ErrorCodes::LOGICAL_ERROR); +// queue.pop(); +// queue.push(cursor); +// } +// +// void dropCursor() +// { +// if (!needUpdateCursor()) +// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR); +// +// queue.pop(); +// } +// +// const TSortCursor & top() const +// { +// if (needUpdateCursor()) +// throw Exception("Cannot get top element from sort cursor queue because " +// "need to update cursor.", ErrorCodes::LOGICAL_ERROR); +// +// return queue.top(); +// } +// +// void pop() +// { +// if (needUpdateCursor()) +// throw Exception("Cannot pop element from sort cursor queue because " +// "need to update cursor.", ErrorCodes::LOGICAL_ERROR); +// queue.pop(); +// } +// +//private: +// /// Queue with SortCursors. +// using PriorityQueue = std::priority_queue; +// PriorityQueue queue; +//}; + +} + +using SharedChunkPtr = boost::intrusive_ptr; + + +inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr) +{ + ++ptr->refcount; +} + +inline void intrusive_ptr_release(detail::SharedChunk * ptr) +{ + if (0 == --ptr->refcount) + delete ptr; +} + +class MergingSortedTransform : public IProcessor +{ +public: + MergingSortedTransform( + const Block & header, + size_t num_inputs, + const SortDescription & description_, + size_t max_block_size, + UInt64 limit = 0, + bool quiet = false); + + String getName() const override { return "MergingSortedTransform"; } + Status prepare() override; + void work() override; + +protected: + + class MergedData + { + public: + explicit MergedData(const Block & header) + { + columns.reserve(header.columns()); + for (const auto & column : header) + columns.emplace_back(column.type->createColumn()); + } + + void insertRow(const ColumnRawPtrs & raw_columns, size_t row) + { + size_t num_columns = raw_columns.size(); + for (size_t i = 0; i < num_columns; ++i) + columns[i]->insertFrom(*raw_columns[i], row); + + ++total_merged_rows; + ++merged_rows; + } + + void insertFromChunk(Chunk && chunk, size_t limit) + { + if (merged_rows) + throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.", + ErrorCodes::LOGICAL_ERROR); + + auto num_rows = chunk.getNumRows(); + columns = chunk.mutateColumns(); + if (limit && num_rows > limit) + for (auto & column : columns) + column = (*column->cut(0, limit)).mutate(); + + total_merged_rows += num_rows; + merged_rows = num_rows; + } + + Chunk pull() + { + MutableColumns empty_columns; + empty_columns.reserve(columns.size()); + + for (const auto & column : columns) + empty_columns.emplace_back(column->cloneEmpty()); + + empty_columns.swap(columns); + Chunk chunk(std::move(empty_columns), merged_rows); + merged_rows = 0; + + return chunk; + } + + UInt64 totalMergedRows() const { return total_merged_rows; } + UInt64 mergedRows() const { return merged_rows; } + + private: + UInt64 total_merged_rows = 0; + UInt64 merged_rows = 0; + MutableColumns columns; + }; + + /// Settings + SortDescription description; + const size_t max_block_size; + UInt64 limit; + bool has_collation = false; + bool quiet = false; + + MergedData merged_data; + + /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) + /// If it is not nullptr then it should be populated during execution + WriteBuffer * out_row_sources_buf; + + /// Chunks currently being merged. + std::vector source_chunks; + + using CursorImpls = std::vector; + CursorImpls cursors; + + using Queue = std::priority_queue; + Queue queue_without_collation; + + using QueueWithCollation = std::priority_queue; + QueueWithCollation queue_with_collation; + +private: + + /// Processor state. + bool is_initialized = false; + bool is_finished = false; + bool need_data = false; + size_t next_input_to_read = 0; + + template + void merge(std::priority_queue & queue); + + void insertFromChunk(size_t source_num); + + void updateCursor(Chunk chunk, size_t source_num) + { + auto & shared_chunk_ptr = source_chunks[source_num]; + + if (!shared_chunk_ptr) + { + shared_chunk_ptr = new detail::SharedChunk(std::move(chunk)); + cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num); + has_collation |= cursors[source_num].has_collation; + } + else + { + *shared_chunk_ptr = std::move(chunk); + cursors[source_num].reset(shared_chunk_ptr->getColumns(), {}); + } + + shared_chunk_ptr->all_columns = cursors[source_num].all_columns; + shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns; + } + + void pushToQueue(size_t source_num) + { + if (has_collation) + queue_with_collation.push(SortCursorWithCollation(&cursors[source_num])); + else + queue_without_collation.push(SortCursor(&cursors[source_num])); + } + + template + void initQueue(std::priority_queue & queue) + { + for (auto & cursor : cursors) + if (!cursor.empty()) + queue.push(TSortCursor(&cursor)); + } +}; + +} diff --git a/dbms/src/Processors/Transforms/PartialSortingTransform.cpp b/dbms/src/Processors/Transforms/PartialSortingTransform.cpp new file mode 100644 index 00000000000..151a91ee2c8 --- /dev/null +++ b/dbms/src/Processors/Transforms/PartialSortingTransform.cpp @@ -0,0 +1,22 @@ +#include +#include + +namespace DB +{ + +PartialSortingTransform::PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit = 0) + : ISimpleTransform(header, header, false) + , description(description), limit(limit) +{ +} + +void PartialSortingTransform::transform(Chunk & chunk) +{ + auto num_rows = chunk.getNumRows(); + auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + chunk.clear(); + sortBlock(block, description, limit); + chunk.setColumns(block.getColumns(), num_rows); +} + +} diff --git a/dbms/src/Processors/Transforms/PartialSortingTransform.h b/dbms/src/Processors/Transforms/PartialSortingTransform.h new file mode 100644 index 00000000000..4e4796e8d17 --- /dev/null +++ b/dbms/src/Processors/Transforms/PartialSortingTransform.h @@ -0,0 +1,27 @@ +#pragma once +#include +#include + +namespace DB +{ + +/** Sorts each block individually by the values of the specified columns. + * At the moment, not very optimal algorithm is used. + */ +class PartialSortingTransform : public ISimpleTransform +{ +public: + /// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order. + PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit = 0); + + String getName() const override { return "PartialSortingTransform"; } + +protected: + void transform(Chunk & chunk) override; + +private: + SortDescription description; + UInt64 limit; +}; + +}