Use processors for final.

This commit is contained in:
Nikolai Kochetov 2020-04-14 13:04:49 +03:00
parent 1ce2e1b93b
commit 12f4cfb2c7

View File

@ -38,18 +38,9 @@ namespace std
} }
#endif #endif
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/CollapsingFinalBlockInputStream.h> #include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <DataStreams/AddingConstColumnBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h> #include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h> #include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h> #include <DataTypes/DataTypeEnum.h>
@ -59,6 +50,10 @@ namespace std
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/ReverseTransform.h> #include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Merges/MergingSortedTransform.h> #include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h> #include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ConcatProcessor.h> #include <Processors/ConcatProcessor.h>
@ -1096,16 +1091,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
}; };
BlockInputStreamPtr merged; BlockInputStreamPtr merged;
ProcessorPtr merged_processor;
switch (data.merging_params.mode) switch (data.merging_params.mode)
{ {
case MergeTreeData::MergingParams::Ordinary: case MergeTreeData::MergingParams::Ordinary:
{ {
auto merged_processor = merged_processor = std::make_shared<MergingSortedTransform>(header, pipes.size(),
std::make_shared<MergingSortedTransform>(header, pipes.size(), sort_description, max_block_size); sort_description, max_block_size);
Pipe pipe(std::move(pipes), std::move(merged_processor)); break;
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
return pipes;
} }
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing:
@ -1114,28 +1107,36 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
break; break;
case MergeTreeData::MergingParams::Summing: case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(streams_to_merge(), merged_processor = std::make_shared<SummingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.columns_to_sum, max_block_size); sort_description, data.merging_params.columns_to_sum, max_block_size);
break; break;
case MergeTreeData::MergingParams::Aggregating: case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(streams_to_merge(), sort_description, max_block_size); merged_processor = std::make_shared<AggregatingSortedTransform>(header, pipes.size(),
sort_description, max_block_size);
break; break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(streams_to_merge(), merged_processor = std::make_shared<ReplacingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.version_column, max_block_size); sort_description, data.merging_params.version_column, max_block_size);
break; break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>( merged_processor = std::make_shared<VersionedCollapsingTransform>(header, pipes.size(),
streams_to_merge(), sort_description, data.merging_params.sign_column, max_block_size); sort_description, data.merging_params.sign_column, max_block_size);
break; break;
case MergeTreeData::MergingParams::Graphite: case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
} }
if (merged_processor)
{
Pipe pipe(std::move(pipes), std::move(merged_processor));
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
}
if (merged) if (merged)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(merged)); pipes.emplace_back(std::make_shared<SourceFromInputStream>(merged));