2020-06-17 19:57:13 +00:00
|
|
|
#include <Processors/QueryPlan/FinishSortingStep.h>
|
|
|
|
#include <Processors/Transforms/DistinctTransform.h>
|
|
|
|
#include <Processors/QueryPipeline.h>
|
|
|
|
#include <Processors/Merges/MergingSortedTransform.h>
|
|
|
|
#include <Processors/Transforms/PartialSortingTransform.h>
|
|
|
|
#include <Processors/Transforms/FinishSortingTransform.h>
|
2020-06-27 14:02:24 +00:00
|
|
|
#include <IO/Operators.h>
|
2020-06-17 19:57:13 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2020-06-18 13:00:16 +00:00
|
|
|
static ITransformingStep::DataStreamTraits getTraits()
|
|
|
|
{
|
2020-06-18 20:11:42 +00:00
|
|
|
return ITransformingStep::DataStreamTraits
|
|
|
|
{
|
2020-06-22 10:18:28 +00:00
|
|
|
.preserves_distinct_columns = true,
|
|
|
|
.returns_single_stream = true,
|
|
|
|
.preserves_number_of_streams = false,
|
2020-06-18 13:00:16 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2020-06-17 19:57:13 +00:00
|
|
|
FinishSortingStep::FinishSortingStep(
|
|
|
|
const DataStream & input_stream_,
|
|
|
|
SortDescription prefix_description_,
|
|
|
|
SortDescription result_description_,
|
|
|
|
size_t max_block_size_,
|
|
|
|
UInt64 limit_)
|
2020-06-18 13:00:16 +00:00
|
|
|
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
|
2020-06-17 19:57:13 +00:00
|
|
|
, prefix_description(std::move(prefix_description_))
|
|
|
|
, result_description(std::move(result_description_))
|
|
|
|
, max_block_size(max_block_size_)
|
|
|
|
, limit(limit_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
void FinishSortingStep::transformPipeline(QueryPipeline & pipeline)
|
|
|
|
{
|
|
|
|
bool need_finish_sorting = (prefix_description.size() < result_description.size());
|
|
|
|
if (pipeline.getNumStreams() > 1)
|
|
|
|
{
|
|
|
|
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
|
|
|
|
auto transform = std::make_shared<MergingSortedTransform>(
|
|
|
|
pipeline.getHeader(),
|
|
|
|
pipeline.getNumStreams(),
|
|
|
|
prefix_description,
|
|
|
|
max_block_size, limit_for_merging);
|
|
|
|
|
|
|
|
pipeline.addPipe({ std::move(transform) });
|
|
|
|
}
|
|
|
|
|
|
|
|
pipeline.enableQuotaForCurrentStreams();
|
|
|
|
|
|
|
|
if (need_finish_sorting)
|
|
|
|
{
|
|
|
|
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
|
|
|
|
{
|
|
|
|
if (stream_type != QueryPipeline::StreamType::Main)
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
|
|
|
|
});
|
|
|
|
|
|
|
|
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
|
|
|
|
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
|
|
|
|
{
|
|
|
|
return std::make_shared<FinishSortingTransform>(
|
|
|
|
header, prefix_description, result_description, max_block_size, limit);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-27 14:02:24 +00:00
|
|
|
void FinishSortingStep::describeActions(FormatSettings & settings) const
|
2020-06-24 12:09:01 +00:00
|
|
|
{
|
2020-06-27 14:02:24 +00:00
|
|
|
String prefix(settings.offset, ' ');
|
2020-06-24 12:09:01 +00:00
|
|
|
|
2020-06-27 14:02:24 +00:00
|
|
|
settings.out << prefix << "Prefix sort description: ";
|
|
|
|
dumpSortDescription(prefix_description, input_streams.front().header, settings.out);
|
|
|
|
settings.out << '\n';
|
2020-06-24 12:09:01 +00:00
|
|
|
|
2020-06-27 14:02:24 +00:00
|
|
|
settings.out << prefix << "Result sort description: ";
|
|
|
|
dumpSortDescription(result_description, input_streams.front().header, settings.out);
|
|
|
|
settings.out << '\n';
|
|
|
|
|
|
|
|
if (limit)
|
|
|
|
settings.out << prefix << "Limit " << limit << '\n';
|
2020-06-24 12:09:01 +00:00
|
|
|
}
|
|
|
|
|
2020-06-17 19:57:13 +00:00
|
|
|
}
|