Updated MergeSortingTransform.

This commit is contained in:
Nikolai Kochetov 2019-02-28 18:21:55 +03:00
parent 19fa17fd84
commit fd25fab652
2 changed files with 366 additions and 47 deletions

View File

@ -13,6 +13,8 @@
#include <DataStreams/NativeBlockOutputStream.h>
#include <queue>
#include <Processors/ISource.h>
#include <Processors/Transforms/MergingSortedTransform.h>
namespace ProfileEvents
@ -25,18 +27,92 @@ namespace ProfileEvents
namespace DB
{
class SinkToNativeStream : public IAccumulatingTransform
{
public:
SinkToNativeStream(const Block & header, Logger * log_, std::string path_)
: IAccumulatingTransform(header, header), log(log_)
, path(std::move(path_)), file_buf(path), compressed_buf(file_buf)
, stream(std::make_shared<NativeBlockOutputStream>(compressed_buf, 0, header))
{
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
stream->writePrefix();
}
String getName() const override { return "SinkToNativeStream"; }
void consume(Chunk chunk) override
{
stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
}
Chunk generate() override
{
if (stream)
{
stream->writeSuffix();
LOG_INFO(log, "Done writing part of data into temporary file " + path);
}
stream.reset();
return Chunk();
}
private:
Logger * log;
std::string path;
WriteBufferFromFile file_buf;
CompressedWriteBuffer compressed_buf;
BlockOutputStreamPtr stream;
};
class SourceFromNativeStream : public ISource
{
public:
SourceFromNativeStream(const Block & header, const std::string & path)
: ISource(header), file_in(path), compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header, 0))
{
block_in->readPrefix();
}
String getName() const override { return "SourceFromNativeStream"; }
Chunk generate() override
{
if (!block_in)
return {};
auto block = block_in->read();
if (!block)
{
block_in.reset();
return {};
}
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
private:
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
};
/** Part of implementation. Merging array of ready (already read from somewhere) chunks.
* Returns result of merge as stream of chunks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSorter
{
public:
MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_ = 0);
MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_);
Chunk read();
private:
Chunks & chunks;
Chunks chunks;
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
@ -57,8 +133,23 @@ private:
Chunk mergeImpl(std::priority_queue<TSortCursor> & queue);
};
MergeSorter::MergeSorter(Chunks & chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: chunks(chunks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
class MergeSorterSource : public ISource
{
public:
MergeSorterSource(Block header, Chunks chunks, SortDescription & description, size_t max_merged_block_size, UInt64 limit)
: ISource(std::move(header)), merge_sorter(std::move(chunks), description, max_merged_block_size, limit) {}
String getName() const override { return "MergeSorterSource"; }
protected:
Chunk generate() override { return merge_sorter.read(); }
private:
MergeSorter merge_sorter;
};
MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Chunks nonempty_chunks;
for (auto & chunk : chunks)
@ -155,12 +246,12 @@ MergeSortingTransform::MergeSortingTransform(
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: IAccumulatingTransform(header, header)
: IProcessor({header}, {header})
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
auto & sample = getInputPort().getHeader();
auto & sample = inputs.front().getHeader();
/// Replace column names to column position in sort_description.
for (auto & column_description : description)
@ -202,6 +293,191 @@ MergeSortingTransform::MergeSortingTransform(
description.swap(description_without_constants);
}
IProcessor::Status MergeSortingTransform::prepare()
{
if (stage == Stage::Serialize)
{
if (current_processor)
return Status::ExpandPipeline;
auto status = prepareSerialize();
if (status != Status::Finished)
return status;
stage = Stage::Consume;
}
if (stage == Stage::Consume)
{
auto status = prepareConsume();
if (status != Status::Finished)
return status;
stage = Stage::Generate;
}
/// stage == Stage::Generate
if (!generated_prefix)
return Status::Ready;
if (!processors.empty())
return Status::ExpandPipeline;
return prepareGenerate();
}
IProcessor::Status MergeSortingTransform::prepareConsume()
{
auto & input = inputs.front();
auto & output = outputs.front();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (generated_chunk)
output.push(std::move(generated_chunk));
/// Check can input.
if (!current_chunk)
{
if (input.isFinished())
return Status::Finished;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
}
/// Now consume.
return Status::Ready;
}
IProcessor::Status MergeSortingTransform::prepareSerialize()
{
auto & input = inputs.back();
auto & output = outputs.back();
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (current_chunk)
output.push(std::move(current_chunk));
if (merge_sorter)
return Status::Ready;
output.finish();
if (input.isFinished())
return Status::Finished;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input.pull();
input.close();
return Status::Finished;
}
IProcessor::Status MergeSortingTransform::prepareGenerate()
{
auto & output = outputs.front();
if (output.isFinished())
return Status::Finished;
if (!output.canPush())
return Status::PortFull;
if (merge_sorter)
{
if (!current_chunk)
return Status::Ready;
output.push(std::move(current_chunk));
return Status::PortFull;
}
else
{
auto & input = inputs.back();
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
output.push(input.pull());
return Status::PortFull;
}
}
void MergeSortingTransform::work()
{
if (stage == Stage::Consume)
consume(std::move(current_chunk));
if (stage == Stage::Serialize)
serialize();
if (stage == Stage::Generate)
generate();
}
Processors MergeSortingTransform::expandPipeline()
{
if (!processors.empty())
{
/// Before generate.
inputs.emplace_back(header_without_constants, this);
connect(current_processor->getOutputs().front(), getInputs().back());
current_processor.reset();
inputs.back().setNeeded();
return std::move(processors);
}
else
{
/// Before serialize.
inputs.emplace_back(header_without_constants, this);
outputs.emplace_back(header_without_constants, this);
connect(current_processor->getOutputs().front(), getInputs().back());
connect(getOutputs().back(), current_processor->getInputs().front());
inputs.back().setNeeded();
return {std::move(current_processor)};
}
}
void MergeSortingTransform::consume(Chunk chunk)
{
/** Algorithm:
@ -215,7 +491,7 @@ void MergeSortingTransform::consume(Chunk chunk)
/// Return the chunk as is.
if (description.empty())
{
setReadyChunk(std::move(chunk));
generated_chunk = std::move(chunk);
return;
}
@ -246,36 +522,66 @@ void MergeSortingTransform::consume(Chunk chunk)
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit);
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
current_processor = std::make_shared<SinkToNativeStream>(header_without_constants, log, path);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
/// NOTE. Possibly limit disk usage.
/// NOTE. This should be another one processor.
/// TODO: Rewrite this code when processors could be able to create another processors.
block_out.writePrefix();
while (auto next = merge_sorter.read())
{
auto block = header_without_constants.cloneWithColumns(next.detachColumns());
block_out.write(block);
}
block_out.writeSuffix();
LOG_INFO(log, "Done writing part of data into temporary file " + path);
chunks.clear();
stage = Stage::Serialize;
sum_bytes_in_blocks = 0;
sum_rows_in_blocks = 0;
}
}
void MergeSortingTransform::serialize()
{
current_chunk = merge_sorter->read();
if (!current_chunk)
merge_sorter.reset();
}
void MergeSortingTransform::generate()
{
if (!generated_prefix)
{
if (temporary_files.empty())
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
else
{
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
/// Create sorted streams to merge.
for (const auto & file : temporary_files)
processors.emplace_back(std::make_unique<SourceFromNativeStream>(header_without_constants, file->path()));
if (!chunks.empty())
processors.emplace_back(std::make_shared<MergeSorterSource>(
header_without_constants, std::move(chunks), description, max_merged_block_size, limit));
current_processor = std::make_shared<MergingSortedTransform>(
header_without_constants, processors.size(), description, max_merged_block_size, limit);
auto next_input = current_processor->getInputs().begin();
for (auto & processor : processors)
connect(processor->getOutputs().front(), *(next_input++));
processors.push_back(current_processor);
}
generated_prefix = true;
}
if (merge_sorter)
{
generated_chunk = merge_sorter->read();
if (!generated_chunk)
merge_sorter.reset();
}
}
void MergeSortingTransform::remerge()
{
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size()
<< " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSorter merge_sorter(chunks, description, max_merged_block_size, limit);

View File

@ -14,7 +14,9 @@
namespace DB
{
class MergeSortingTransform : public IAccumulatingTransform
class MergeSorter;
class MergeSortingTransform : public IProcessor
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
@ -27,8 +29,10 @@ public:
String getName() const override { return "MergeSortingTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
Status prepare() override;
void work() override;
Processors expandPipeline() override;
private:
SortDescription description;
@ -58,25 +62,34 @@ private:
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
/// For reading data from temporary file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path, const Block & header)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();
void removeConstColumns(Chunk & chunk);
enum class Stage
{
Consume = 0,
Generate,
Serialize,
};
Stage stage = Stage::Consume;
bool generated_prefix = false;
Chunk current_chunk;
Chunk generated_chunk;
std::unique_ptr<MergeSorter> merge_sorter;
ProcessorPtr current_processor;
Processors processors;
Status prepareConsume();
Status prepareSerialize();
Status prepareGenerate();
void consume(Chunk chunk);
void serialize();
void generate();
};
}