ClickHouse/src/Processors/QueryPlan/MergingSortedStep.cpp

77 lines
2.1 KiB
C++
Raw Normal View History

2020-06-16 15:47:40 +00:00
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Merges/MergingSortedTransform.h>
2020-06-27 14:02:24 +00:00
#include <IO/Operators.h>
2020-06-16 15:47:40 +00:00
namespace DB
{
static ITransformingStep::Traits getTraits(size_t limit)
2020-06-18 13:00:16 +00:00
{
return ITransformingStep::Traits
2020-06-18 20:11:42 +00:00
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
2020-06-18 13:00:16 +00:00
};
}
2020-06-16 15:47:40 +00:00
MergingSortedStep::MergingSortedStep(
2020-06-16 16:13:07 +00:00
const DataStream & input_stream,
2020-06-16 15:47:40 +00:00
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
2020-07-27 12:48:33 +00:00
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
2020-06-16 15:47:40 +00:00
, sort_description(std::move(sort_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// TODO: check input_stream is partially sorted (each port) by the same description.
output_stream->sort_description = sort_description;
output_stream->sort_mode = DataStream::SortMode::Stream;
2020-06-16 15:47:40 +00:00
}
2020-07-28 14:13:31 +00:00
void MergingSortedStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = limit == 0;
}
}
2020-06-16 15:47:40 +00:00
void MergingSortedStep::transformPipeline(QueryPipeline & pipeline)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
max_block_size, limit);
2020-08-04 13:06:59 +00:00
pipeline.addTransform(std::move(transform));
2020-06-16 15:47:40 +00:00
}
}
2020-06-27 14:02:24 +00:00
void MergingSortedStep::describeActions(FormatSettings & settings) const
2020-06-24 12:09:01 +00:00
{
2020-06-27 14:02:24 +00:00
String prefix(settings.offset, ' ');
settings.out << prefix << "Sort description: ";
dumpSortDescription(sort_description, input_streams.front().header, settings.out);
settings.out << '\n';
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
2020-06-24 12:09:01 +00:00
}
2020-06-16 15:47:40 +00:00
}