Better pipeline expanding. MergeSortingTransform refactoring.

This commit is contained in:
Nikolai Kochetov 2019-04-16 14:52:29 +03:00
parent aac244a1ce
commit 4cf8c6cfdd
2 changed files with 2 additions and 3 deletions

View File

@ -290,7 +290,7 @@ IProcessor::Status MergeSortingTransform::prepare()
{
if (stage == Stage::Serialize)
{
if (current_processor)
if (!processors.empty())
return Status::ExpandPipeline;
auto status = prepareSerialize();
@ -501,7 +501,7 @@ void MergeSortingTransform::consume(Chunk chunk)
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
processors.emplace_back(current_processor);

View File

@ -83,7 +83,6 @@ private:
Chunk generated_chunk;
std::unique_ptr<MergeSorter> merge_sorter;
ProcessorPtr current_processor;
ProcessorPtr external_merging_sorted;
Processors processors;