Limit push down.

This commit is contained in:
Nikolai Kochetov 2020-07-28 17:13:31 +03:00
parent e6508db4f0
commit 9c6627a7eb
11 changed files with 105 additions and 3 deletions

View File

@ -42,6 +42,15 @@ FinishSortingStep::FinishSortingStep(
output_stream->sort_mode = DataStream::SortMode::Stream;
}
void FinishSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = limit == 0;
}
}
void FinishSortingStep::transformPipeline(QueryPipeline & pipeline)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());

View File

@ -22,6 +22,9 @@ public:
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription prefix_description;
SortDescription result_description;

View File

@ -23,6 +23,8 @@ public:
void describeActions(FormatSettings & settings) const override;
size_t limitPlusOffset() const { return limit + offset; }
private:
size_t limit;
size_t offset;

View File

@ -45,6 +45,15 @@ MergeSortingStep::MergeSortingStep(
: DataStream::SortMode::Port;
}
void MergeSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = limit == 0;
}
}
void MergeSortingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr

View File

@ -27,6 +27,9 @@ public:
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription description;
size_t max_merged_block_size;

View File

@ -37,6 +37,15 @@ MergingSortedStep::MergingSortedStep(
output_stream->sort_mode = DataStream::SortMode::Stream;
}
void MergingSortedStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = limit == 0;
}
}
void MergingSortedStep::transformPipeline(QueryPipeline & pipeline)
{
/// If there are several streams, then we merge them into one

View File

@ -23,6 +23,9 @@ public:
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription sort_description;
size_t max_block_size;

View File

@ -37,6 +37,15 @@ PartialSortingStep::PartialSortingStep(
output_stream->sort_mode = DataStream::SortMode::Chunk;
}
void PartialSortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = limit == 0;
}
}
void PartialSortingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr

View File

@ -22,6 +22,9 @@ public:
void describeActions(FormatSettings & settings) const override;
/// Add limit or change it to lower value.
void updateLimit(size_t limit_);
private:
SortDescription sort_description;
UInt64 limit;

View File

@ -5,6 +5,10 @@
#include <IO/Operators.h>
#include <stack>
#include <Processors/QueryPlan/LimitStep.h>
#include "MergingSortedStep.h"
#include "FinishSortingStep.h"
#include "MergeSortingStep.h"
#include "PartialSortingStep.h"
namespace DB
{
@ -129,6 +133,7 @@ void QueryPlan::addStep(QueryPlanStepPtr step)
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
checkInitialized();
optimize();
struct Frame
{
@ -308,8 +313,50 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
}
}
static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child)
/// If plan looks like Limit -> Sorting, update limit for Sorting
bool tryUpdateLimitForSortingSteps(QueryPlan::Node * node, size_t limit)
{
QueryPlanStepPtr & step = node->step;
QueryPlan::Node * child = nullptr;
bool updated = false;
if (auto * merging_sorted = typeid_cast<MergingSortedStep *>(step.get()))
{
/// TODO: remove LimitStep here.
merging_sorted->updateLimit(limit);
updated = true;
child = node->children.front();
}
else if (auto * finish_sorting = typeid_cast<FinishSortingStep *>(step.get()))
{
/// TODO: remove LimitStep here.
finish_sorting->updateLimit(limit);
updated = true;
}
else if (auto * merge_sorting = typeid_cast<MergeSortingStep *>(step.get()))
{
merge_sorting->updateLimit(limit);
updated = true;
child = node->children.front();
}
else if (auto * partial_sorting = typeid_cast<PartialSortingStep *>(step.get()))
{
partial_sorting->updateLimit(limit);
updated = true;
}
/// We often have chain PartialSorting -> MergeSorting -> MergingSorted
/// Try update limit for them also if possible.
if (child)
tryUpdateLimitForSortingSteps(child, limit);
return updated;
}
/// Move LimitStep down if possible.
static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_node)
{
auto & child = child_node->step;
const auto * limit = typeid_cast<const LimitStep *>(parent.get());
if (!limit)
@ -321,6 +368,10 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child
if (!transforming)
return;
/// Special cases for sorting steps.
if (tryUpdateLimitForSortingSteps(child_node, limit->limitPlusOffset()))
return;
/// Now we should decide if pushing down limit possible for this step.
const auto & transform_traits = transforming->getTransformTraits();
@ -356,7 +407,7 @@ void QueryPlan::optimize()
{
/// First entrance, try push down.
if (frame.node->children.size() == 1)
tryPushDownLimit(frame.node->step, frame.node->children.front()->step);
tryPushDownLimit(frame.node->step, frame.node->children.front());
}
if (frame.next_child < frame.node->children.size())

View File

@ -62,7 +62,6 @@ public:
void addInterpreterContext(std::shared_ptr<Context> context);
private:
/// Tree node. Step and it's children.
struct Node
{
@ -70,6 +69,8 @@ private:
std::vector<Node *> children = {};
};
private:
using Nodes = std::list<Node>;
Nodes nodes;