Added MergingSortedTransform.

This commit is contained in:
Nikolai Kochetov 2019-02-26 21:40:08 +03:00
parent c8c2e1fd82
commit 2f98e779e1
12 changed files with 1081 additions and 13 deletions

View File

@ -3,6 +3,7 @@
#include <Common/typeid_cast.h>
#include <Core/SortDescription.h>
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnString.h>
@ -47,26 +48,44 @@ struct SortCursorImpl
reset(block);
}
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
for (auto & column_desc : desc)
{
if (!column_desc.column_name.empty())
throw Exception("SortDesctiption should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
reset(columns, {});
}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
void reset(const Block & block)
{
reset(block.getColumns(), block);
}
/// Set the cursor to the beginning of the new block.
void reset(const Columns & columns, const Block & block)
{
all_columns.clear();
sort_columns.clear();
size_t num_columns = block.columns();
size_t num_columns = columns.size();
for (size_t j = 0; j < num_columns; ++j)
all_columns.push_back(block.safeGetByPosition(j).column.get());
all_columns.push_back(columns[j].get());
for (size_t j = 0, size = desc.size(); j < size; ++j)
{
size_t column_number = !desc[j].column_name.empty()
? block.getPositionByName(desc[j].column_name)
: desc[j].column_number;
sort_columns.push_back(block.safeGetByPosition(column_number).column.get());
auto & column_desc = desc[j];
size_t column_number = !column_desc.column_name.empty()
? block.getPositionByName(column_desc.column_name)
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());
need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String)
has_collation |= need_collation[j];

View File

@ -9,7 +9,7 @@ void removeConstantsFromBlock(Block & block)
size_t i = 0;
while (i < columns)
{
if (block.getByPosition(i).column->isColumnConst())
if (block.getByPosition(i).column && block.getByPosition(i).column->isColumnConst())
{
block.erase(i);
--columns;
@ -22,13 +22,14 @@ void removeConstantsFromBlock(Block & block)
void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
/// Note: This code is not correct if column description contains column numbers instead of column names.
/// Hopefully, everywhere where it is used, column description contains names.
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return header.getByName(elem.column_name).column->isColumnConst();
else
return header.safeGetByPosition(elem.column_number).column->isColumnConst();
auto & column = !elem.column_name.empty() ? header.getByName(elem.column_name)
: header.safeGetByPosition(elem.column_number);
return column.column && column.column->isColumnConst();
}), description.end());
}
@ -41,7 +42,7 @@ void enrichBlockWithConstants(Block & block, const Block & header)
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column->isColumnConst())
if (col_type_name.column && col_type_name.column->isColumnConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}

View File

@ -92,6 +92,15 @@ MutableColumns Chunk::mutateColumns()
return mut_columns;
}
MutableColumns Chunk::cloneEmptyColumns() const
{
size_t num_columns = columns.size();
MutableColumns mut_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
mut_columns[i] = columns[i]->cloneEmpty();
return mut_columns;
}
Columns Chunk::detachColumns()
{
num_rows = 0;
@ -117,6 +126,15 @@ void Chunk::erase(size_t position)
columns.erase(columns.begin() + position);
}
UInt64 Chunk::allocatedBytes() const
{
UInt64 res = 0;
for (const auto & column : columns)
res += column->allocatedBytes();
return res;
}
void ChunkMissingValues::setBit(size_t column_idx, size_t row_idx)
{

View File

@ -32,6 +32,8 @@ public:
void setColumns(MutableColumns columns_, UInt64 num_rows_);
Columns detachColumns();
MutableColumns mutateColumns();
/** Get empty columns with the same types as in block. */
MutableColumns cloneEmptyColumns() const;
const ChunkInfoPtr & getChunkInfo() const { return chunk_info; }
void setChunkInfo(ChunkInfoPtr chunk_info_) { chunk_info = std::move(chunk_info_); }
@ -46,6 +48,8 @@ public:
void clear();
void erase(size_t position);
UInt64 allocatedBytes() const;
private:
Columns columns;
UInt64 num_rows = 0;

View File

@ -71,5 +71,14 @@ void IAccumulatingTransform::work()
}
}
void IAccumulatingTransform::setReadyChunk(Chunk chunk)
{
if (current_output_chunk)
throw Exception("IAccumulatingTransform already has input. Cannot set another chunk. "
"Probably, setReadyChunk method was called twose per consume().", ErrorCodes::LOGICAL_ERROR);
current_output_chunk = std::move(chunk);
}
}

View File

@ -25,6 +25,9 @@ protected:
virtual void consume(Chunk chunk) = 0;
virtual Chunk generate() = 0;
/// This method can be called once per consume call. In case if some chunks are ready.
void setReadyChunk(Chunk chunk);
public:
IAccumulatingTransform(Block input_header, Block output_header);

View File

@ -0,0 +1,331 @@
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <queue>
namespace ProfileEvents
{
extern const Event ExternalSortWritePart;
extern const Event ExternalSortMerge;
}
namespace DB
{
/** Part of implementation. Merging array of ready (already read from somewhere) chunks.
* Returns result of merge as stream of chunks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSorter
{
public:
MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_ = 0);
Chunk read();
private:
Chunks & chunks;
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t total_merged_rows = 0;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
bool has_collation = false;
std::priority_queue<SortCursor> queue_without_collation;
std::priority_queue<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Chunk mergeImpl(std::priority_queue<TSortCursor> & queue);
};
MergeSorter::MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: chunks(chunks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Chunks nonempty_chunks;
for (auto & chunk : chunks)
{
if (chunk.getNumRows() == 0)
continue;
cursors.emplace_back(chunk.getColumns(), description);
has_collation |= cursors.back().has_collation;
nonempty_chunks.emplace_back(std::move(chunk));
}
chunks.swap(nonempty_chunks);
if (!has_collation)
{
for (auto & cursor : cursors)
queue_without_collation.push(SortCursor(&cursor));
}
else
{
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
}
Chunk MergeSorter::read()
{
if (chunks.empty())
return Chunk();
if (chunks.size() == 1)
{
auto res = std::move(chunks[0]);
chunks.clear();
return res;
}
return !has_collation
? mergeImpl<SortCursor>(queue_without_collation)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
}
template <typename TSortCursor>
Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue)
{
size_t num_columns = chunks[0].getNumColumns();
MutableColumns merged_columns = chunks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
++total_merged_rows;
++merged_rows;
if (!current->isLast())
{
current->next();
queue.push(current);
}
if (limit && total_merged_rows == limit)
{
chunks.clear();
return Chunk(std::move(merged_columns), merged_rows);
}
if (merged_rows == max_merged_block_size)
return Chunk(std::move(merged_columns), merged_rows);
}
if (merged_rows == 0)
return {};
return Chunk(std::move(merged_columns), merged_rows);
}
MergeSortingTransform::MergeSortingTransform(
const Block & header,
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: IAccumulatingTransform(header, header)
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
auto & sample = getInputPort().getHeader();
/// Replace column names to column position in sort_description.
for (auto & column_description : description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = sample.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
/// Remove constants from header and map old indexes to new.
size_t num_columns = sample.columns();
ColumnNumbers map(num_columns, num_columns);
for (size_t pos = 0; pos < num_columns; ++pos)
{
const auto & column = sample.getByPosition(pos);
if (!(column.column && column.column->isColumnConst()))
{
map[pos] = header_without_constants.columns();
header_without_constants.insert(column);
}
}
/// Remove constants from column_description and remap positions.
SortDescription description_without_constants;
description_without_constants.reserve(description.size());
for (const auto & column_description : description)
{
auto old_pos = column_description.column_number;
auto new_pos = map[old_pos];
if (new_pos < num_columns)
{
description_without_constants.push_back(column_description);
description_without_constants.back().column_number = new_pos;
}
}
description.swap(description_without_constants);
}
void MergeSortingTransform::consume(Chunk chunk)
{
/** Algorithm:
* - read to memory blocks from source stream;
* - if too many of them and if external sorting is enabled,
* - merge all blocks to sorted stream and write it to temporary file;
* - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory.
*/
/// If there were only const columns in sort description, then there is no need to sort.
/// Return the chunk as is.
if (description.empty())
{
setReadyChunk(std::move(chunk));
return;
}
removeConstColumns(chunk);
sum_rows_in_blocks += chunk.getNumRows();
sum_bytes_in_blocks += chunk.allocatedBytes();
chunks.push_back(std::move(chunk));
/** If significant amount of data was accumulated, perform preliminary merging step.
*/
if (chunks.size() > 1
&& limit
&& limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
&& remerge_is_useful
&& max_bytes_before_remerge
&& sum_bytes_in_blocks > max_bytes_before_remerge)
{
remerge();
}
/** If too many of them and if external sorting is enabled,
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
* NOTE. It's possible to check free space in filesystem.
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
/// NOTE. Possibly limit disk usage.
/// NOTE. This should be another one processor.
/// TODO: Rewrite this code when processors could be able to create another processors.
block_out.writePrefix();
while (auto next = merge_sorter.read())
{
auto block = header_without_constants.cloneWithColumns(next.detachColumns());
block_out.write(block);
}
block_out.writeSuffix();
LOG_INFO(log, "Done writing part of data into temporary file " + path);
chunks.clear();
sum_bytes_in_blocks = 0;
sum_rows_in_blocks = 0;
}
}
void MergeSortingTransform::remerge()
{
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit);
Chunks new_chunks;
size_t new_sum_rows_in_blocks = 0;
size_t new_sum_bytes_in_blocks = 0;
while (auto chunk = merge_sorter.read())
{
new_sum_rows_in_blocks += chunk.getNumRows();
new_sum_bytes_in_blocks += chunk.allocatedBytes();
new_chunks.emplace_back(chunk);
}
LOG_DEBUG(log, "Memory usage is lowered from "
<< formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to "
<< formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
/// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
remerge_is_useful = false;
chunks = std::move(new_chunks);
sum_rows_in_blocks = new_sum_rows_in_blocks;
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}
void MergeSortingTransform::removeConstColumns(Chunk & chunk)
{
size_t num_columns = chunk.getNumColumns();
size_t num_rows = chunk.getNumRows();
if (num_columns != const_columns_to_remove.size())
throw Exception("Block has different number of columns with header: " + toString(num_columns)
+ " vs " + toString(const_columns_to_remove.size()), ErrorCodes::LOGICAL_ERROR);
auto columns = chunk.detachColumns();
Columns column_without_constants;
column_without_constants.reserve(header_without_constants.columns());
for (size_t position = 0; position < num_columns; ++position)
{
if (!const_columns_to_remove[position])
column_without_constants.push_back(std::move(columns[position]));
}
chunk.setColumns(std::move(column_without_constants), num_rows);
}
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <Processors/IAccumulatingTransform.h>
#include <Core/SortDescription.h>
#include <Poco/TemporaryFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <common/logger_useful.h>
#include <queue>
namespace DB
{
class MergeSortingTransform : public IAccumulatingTransform
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingTransform(const Block & header,
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
String getName() const override { return "MergeSortingTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
Logger * log = &Logger::get("MergeSortingBlockInputStream");
Chunks chunks;
size_t sum_rows_in_blocks = 0;
size_t sum_bytes_in_blocks = 0;
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true;
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block header_without_constants;
/// Columns which were constant in header and we need to remove from chunks.
std::vector<bool> const_columns_to_remove;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
/// For reading data from temporary file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path, const Block & header)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();
void removeConstColumns(Chunk & chunk);
};
}

View File

@ -0,0 +1,302 @@
#include <Processors/Transforms/MergingSortedTransform.h>
#include <DataStreams/ColumnGathererStream.h>
#include <IO/WriteBuffer.h>
namespace DB
{
MergingSortedTransform::MergingSortedTransform(
const Block & header,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
UInt64 limit,
bool quiet)
: IProcessor(InputPorts(num_inputs, header), {header})
, description(description_), max_block_size(max_block_size), limit(limit), quiet(quiet)
, merged_data(header), source_chunks(num_inputs), cursors(num_inputs)
{
auto & sample = outputs[0].getHeader();
/// Replace column names in description to positions.
for (auto & column_description : description)
{
has_collation |= column_description.collator != nullptr;
if (!column_description.column_name.empty())
{
column_description.column_number = sample.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
IProcessor::Status MergingSortedTransform::prepare()
{
auto & output = outputs[0];
/// Special case for no inputs.
if (inputs.empty())
{
output.finish();
return Status::Finished;
}
/// Check can output.
if (output.isFinished())
{
for (auto & in : inputs)
in.close();
return Status::Finished;
}
if (!output.isNeeded())
{
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
if (output.hasData())
return Status::PortFull;
/// Special case for single input.
if (inputs.size() == 1)
{
auto & input = inputs[0];
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (input.hasData())
output.push(input.pull());
return Status::NeedData;
}
/// Push if has data.
if (merged_data.mergedRows())
{
output.push(merged_data.pull());
return Status::PortFull;
}
if (!is_initialized)
{
/// Check for inputs we need.
bool all_inputs_has_data = true;
for (size_t i = 0; i < inputs.size(); ++i)
{
if (inputs[i].isFinished())
continue;
if (!cursors[i].empty())
{
inputs[i].setNotNeeded();
continue;
}
inputs[i].setNeeded();
if (!inputs[i].hasData())
{
all_inputs_has_data = false;
continue;
}
auto chunk = inputs[i].pull();
if (chunk.hasNoRows())
{
all_inputs_has_data = false;
continue;
}
updateCursor(std::move(chunk), i);
}
if (!all_inputs_has_data)
return Status::NeedData;
if (has_collation)
initQueue(queue_with_collation);
else
initQueue(queue_without_collation);
is_initialized = true;
return Status::Ready;
}
else
{
if (is_finished)
{
for (auto & input : inputs)
input.close();
outputs[0].finish();
return Status::Finished;
}
if (need_data)
{
auto & input = inputs[next_input_to_read];
if (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
updateCursor(input.pull(), next_input_to_read);
pushToQueue(next_input_to_read);
need_data = false;
}
}
return Status::Ready;
}
}
void MergingSortedTransform::work()
{
if (has_collation)
merge(queue_with_collation);
else
merge(queue_without_collation);
}
template <typename TSortCursor>
void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
{
/// Returns MergeStatus which we should return if we are going to finish now.
auto can_read_another_row = [&, this]()
{
if (limit && merged_data.totalMergedRows() >= limit)
{
//std::cerr << "Limit reached\n";
is_finished = true;
return false;
}
if (merged_data.totalMergedRows() >= max_block_size)
{
//std::cerr << "max_block_size reached\n";
return false;
}
return true;
};
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (!queue.empty())
{
/// Shouldn't happen at first iteration, but check just in case.
if (!can_read_another_row())
return;
TSortCursor current = queue.top();
queue.pop();
bool first_iteration = true;
while (true)
{
if (!first_iteration && !can_read_another_row())
{
queue.push(current);
return;
}
first_iteration = false;
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
{
//std::cerr << "current block is totally less or equals\n";
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
if (merged_data.mergedRows() != 0)
{
//std::cerr << "merged rows is non-zero\n";
queue.push(current);
return;
}
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
insertFromChunk(source_num);
return;
}
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
merged_data.insertRow(current->all_columns, current->pos);
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current.impl->order);
out_row_sources_buf->write(row_source.data);
}
if (current->isLast())
{
need_data = true;
next_input_to_read = current.impl->order;
if (merged_data.totalMergedRows() >= limit)
is_finished = true;
return;
}
//std::cerr << "moving to next row\n";
current->next();
if (!queue.empty() && current.greater(queue.top()))
{
//std::cerr << "next row is not least, pushing back to queue\n";
queue.push(current);
break;
}
}
}
}
void MergingSortedTransform::insertFromChunk(size_t source_num)
{
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedTrandform", ErrorCodes::LOGICAL_ERROR);
//std::cerr << "copied columns\n";
auto num_rows = source_chunks[source_num]->getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
if (limit && total_merged_rows_after_insertion > limit)
{
num_rows = total_merged_rows_after_insertion - limit;
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), num_rows);
is_finished = true;
}
else
{
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), 0);
need_data = true;
next_input_to_read = source_num;
}
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < num_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
}
}

View File

@ -0,0 +1,250 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <queue>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
//template <typename TSortCursor>
//class Queue
//{
//public:
// bool empty() const { return queue.empty(); }
// void push(TSortCursor cursor) { queue.push(std::move(cursor)); }
//
// bool needUpdateCursor() const { return !empty() && !queue.top()->isLast(); }
//
// void updateCursor(TSortCursor cursor)
// {
// if (!needUpdateCursor())
// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR);
//
// if (cursor->order != queue.top()->order)
// throw Exception("Cannot update cursor for sort cursor queue because top cursor order "
// "(" + toString(queue.top()->order) + ") is not equal to new cursor order "
// "(" + toString(cursor->order) + ").", ErrorCodes::LOGICAL_ERROR);
// queue.pop();
// queue.push(cursor);
// }
//
// void dropCursor()
// {
// if (!needUpdateCursor())
// throw Exception("Do not need to update cursor for sort cursor queue.", ErrorCodes::LOGICAL_ERROR);
//
// queue.pop();
// }
//
// const TSortCursor & top() const
// {
// if (needUpdateCursor())
// throw Exception("Cannot get top element from sort cursor queue because "
// "need to update cursor.", ErrorCodes::LOGICAL_ERROR);
//
// return queue.top();
// }
//
// void pop()
// {
// if (needUpdateCursor())
// throw Exception("Cannot pop element from sort cursor queue because "
// "need to update cursor.", ErrorCodes::LOGICAL_ERROR);
// queue.pop();
// }
//
//private:
// /// Queue with SortCursors.
// using PriorityQueue = std::priority_queue<TSortCursor>;
// PriorityQueue queue;
//};
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
class MergingSortedTransform : public IProcessor
{
public:
MergingSortedTransform(
const Block & header,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
UInt64 limit = 0,
bool quiet = false);
String getName() const override { return "MergingSortedTransform"; }
Status prepare() override;
void work() override;
protected:
class MergedData
{
public:
explicit MergedData(const Block & header)
{
columns.reserve(header.columns());
for (const auto & column : header)
columns.emplace_back(column.type->createColumn());
}
void insertRow(const ColumnRawPtrs & raw_columns, size_t row)
{
size_t num_columns = raw_columns.size();
for (size_t i = 0; i < num_columns; ++i)
columns[i]->insertFrom(*raw_columns[i], row);
++total_merged_rows;
++merged_rows;
}
void insertFromChunk(Chunk && chunk, size_t limit)
{
if (merged_rows)
throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.",
ErrorCodes::LOGICAL_ERROR);
auto num_rows = chunk.getNumRows();
columns = chunk.mutateColumns();
if (limit && num_rows > limit)
for (auto & column : columns)
column = (*column->cut(0, limit)).mutate();
total_merged_rows += num_rows;
merged_rows = num_rows;
}
Chunk pull()
{
MutableColumns empty_columns;
empty_columns.reserve(columns.size());
for (const auto & column : columns)
empty_columns.emplace_back(column->cloneEmpty());
empty_columns.swap(columns);
Chunk chunk(std::move(empty_columns), merged_rows);
merged_rows = 0;
return chunk;
}
UInt64 totalMergedRows() const { return total_merged_rows; }
UInt64 mergedRows() const { return merged_rows; }
private:
UInt64 total_merged_rows = 0;
UInt64 merged_rows = 0;
MutableColumns columns;
};
/// Settings
SortDescription description;
const size_t max_block_size;
UInt64 limit;
bool has_collation = false;
bool quiet = false;
MergedData merged_data;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf;
/// Chunks currently being merged.
std::vector<SharedChunkPtr> source_chunks;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
using Queue = std::priority_queue<SortCursor>;
Queue queue_without_collation;
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
QueueWithCollation queue_with_collation;
private:
/// Processor state.
bool is_initialized = false;
bool is_finished = false;
bool need_data = false;
size_t next_input_to_read = 0;
template <typename TSortCursor>
void merge(std::priority_queue<TSortCursor> & queue);
void insertFromChunk(size_t source_num);
void updateCursor(Chunk chunk, size_t source_num)
{
auto & shared_chunk_ptr = source_chunks[source_num];
if (!shared_chunk_ptr)
{
shared_chunk_ptr = new detail::SharedChunk(std::move(chunk));
cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num);
has_collation |= cursors[source_num].has_collation;
}
else
{
*shared_chunk_ptr = std::move(chunk);
cursors[source_num].reset(shared_chunk_ptr->getColumns(), {});
}
shared_chunk_ptr->all_columns = cursors[source_num].all_columns;
shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns;
}
void pushToQueue(size_t source_num)
{
if (has_collation)
queue_with_collation.push(SortCursorWithCollation(&cursors[source_num]));
else
queue_without_collation.push(SortCursor(&cursors[source_num]));
}
template <typename TSortCursor>
void initQueue(std::priority_queue<TSortCursor> & queue)
{
for (auto & cursor : cursors)
if (!cursor.empty())
queue.push(TSortCursor(&cursor));
}
};
}

View File

@ -0,0 +1,22 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
namespace DB
{
PartialSortingTransform::PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit = 0)
: ISimpleTransform(header, header, false)
, description(description), limit(limit)
{
}
void PartialSortingTransform::transform(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();
sortBlock(block, description, limit);
chunk.setColumns(block.getColumns(), num_rows);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Core/SortDescription.h>
namespace DB
{
/** Sorts each block individually by the values of the specified columns.
* At the moment, not very optimal algorithm is used.
*/
class PartialSortingTransform : public ISimpleTransform
{
public:
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
PartialSortingTransform(const Block & header, SortDescription & description, UInt64 limit = 0);
String getName() const override { return "PartialSortingTransform"; }
protected:
void transform(Chunk & chunk) override;
private:
SortDescription description;
UInt64 limit;
};
}