diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 4a7a411f45a..7c621513cab 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -38,18 +38,9 @@ namespace std } #endif -#include -#include #include -#include #include -#include -#include -#include -#include #include -#include -#include #include #include #include @@ -59,6 +50,10 @@ namespace std #include #include #include +#include +#include +#include +#include #include #include #include @@ -1096,16 +1091,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( }; BlockInputStreamPtr merged; + ProcessorPtr merged_processor; switch (data.merging_params.mode) { case MergeTreeData::MergingParams::Ordinary: { - auto merged_processor = - std::make_shared(header, pipes.size(), sort_description, max_block_size); - Pipe pipe(std::move(pipes), std::move(merged_processor)); - pipes = Pipes(); - pipes.emplace_back(std::move(pipe)); - return pipes; + merged_processor = std::make_shared(header, pipes.size(), + sort_description, max_block_size); + break; } case MergeTreeData::MergingParams::Collapsing: @@ -1114,28 +1107,36 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( break; case MergeTreeData::MergingParams::Summing: - merged = std::make_shared(streams_to_merge(), + merged_processor = std::make_shared(header, pipes.size(), sort_description, data.merging_params.columns_to_sum, max_block_size); break; case MergeTreeData::MergingParams::Aggregating: - merged = std::make_shared(streams_to_merge(), sort_description, max_block_size); + merged_processor = std::make_shared(header, pipes.size(), + sort_description, max_block_size); break; case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream - merged = std::make_shared(streams_to_merge(), + merged_processor = std::make_shared(header, pipes.size(), sort_description, data.merging_params.version_column, max_block_size); break; case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream - merged = std::make_shared( - streams_to_merge(), sort_description, data.merging_params.sign_column, max_block_size); + merged_processor = std::make_shared(header, pipes.size(), + sort_description, data.merging_params.sign_column, max_block_size); break; case MergeTreeData::MergingParams::Graphite: throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } + if (merged_processor) + { + Pipe pipe(std::move(pipes), std::move(merged_processor)); + pipes = Pipes(); + pipes.emplace_back(std::move(pipe)); + } + if (merged) pipes.emplace_back(std::make_shared(merged));