Add FinishSortingStep.

This commit is contained in:
Nikolai Kochetov 2020-06-17 22:57:13 +03:00
parent e8049d34c5
commit 169ad5e805
4 changed files with 99 additions and 33 deletions

View File

@ -95,6 +95,7 @@
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/OffsetsStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
namespace DB
@ -1572,40 +1573,14 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, Inp
{
const Settings & settings = context->getSettingsRef();
bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
input_sorting_info->order_key_prefix_descr,
settings.max_block_size, limit_for_merging);
FinishSortingStep finish_sorting_step(
DataStream{.header = pipeline.getHeader()},
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit);
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, input_sorting_info->order_key_prefix_descr,
output_order_descr, settings.max_block_size, limit);
});
}
finish_sorting_step.transformPipeline(pipeline);
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info)

View File

@ -0,0 +1,61 @@
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
namespace DB
{
FinishSortingStep::FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_)
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
}
void FinishSortingStep::transformPipeline(QueryPipeline & pipeline)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size, limit_for_merging);
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, prefix_description, result_description, max_block_size, limit);
});
}
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
namespace DB
{
class FinishSortingStep : public ITransformingStep
{
public:
FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size,
UInt64 limit);
String getName() const override { return "FinishSorting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription prefix_description;
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
};
}

View File

@ -146,6 +146,7 @@ SRCS(
QueryPlan/ExtremesStep.cpp
QueryPlan/FillingStep.cpp
QueryPlan/FilterStep.cpp
QueryPlan/FinishSortingStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/IQueryPlanStep.cpp