From 9c6627a7eb72bf4bd2006bfaa119d5b5df47bb87 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 28 Jul 2020 17:13:31 +0300 Subject: [PATCH] Limit push down. --- .../QueryPlan/FinishSortingStep.cpp | 9 +++ src/Processors/QueryPlan/FinishSortingStep.h | 3 + src/Processors/QueryPlan/LimitStep.h | 2 + src/Processors/QueryPlan/MergeSortingStep.cpp | 9 +++ src/Processors/QueryPlan/MergeSortingStep.h | 3 + .../QueryPlan/MergingSortedStep.cpp | 9 +++ src/Processors/QueryPlan/MergingSortedStep.h | 3 + .../QueryPlan/PartialSortingStep.cpp | 9 +++ src/Processors/QueryPlan/PartialSortingStep.h | 3 + src/Processors/QueryPlan/QueryPlan.cpp | 55 ++++++++++++++++++- src/Processors/QueryPlan/QueryPlan.h | 3 +- 11 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index 87d44baea06..8d9db8f83a1 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -42,6 +42,15 @@ FinishSortingStep::FinishSortingStep( output_stream->sort_mode = DataStream::SortMode::Stream; } +void FinishSortingStep::updateLimit(size_t limit_) +{ + if (limit_ && (limit == 0 || limit_ < limit)) + { + limit = limit_; + transform_traits.preserves_number_of_rows = limit == 0; + } +} + void FinishSortingStep::transformPipeline(QueryPipeline & pipeline) { bool need_finish_sorting = (prefix_description.size() < result_description.size()); diff --git a/src/Processors/QueryPlan/FinishSortingStep.h b/src/Processors/QueryPlan/FinishSortingStep.h index 41a96b9456a..4bb62037faa 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.h +++ b/src/Processors/QueryPlan/FinishSortingStep.h @@ -22,6 +22,9 @@ public: void describeActions(FormatSettings & settings) const override; + /// Add limit or change it to lower value. + void updateLimit(size_t limit_); + private: SortDescription prefix_description; SortDescription result_description; diff --git a/src/Processors/QueryPlan/LimitStep.h b/src/Processors/QueryPlan/LimitStep.h index e04ecfcb471..7c5bdc75b3f 100644 --- a/src/Processors/QueryPlan/LimitStep.h +++ b/src/Processors/QueryPlan/LimitStep.h @@ -23,6 +23,8 @@ public: void describeActions(FormatSettings & settings) const override; + size_t limitPlusOffset() const { return limit + offset; } + private: size_t limit; size_t offset; diff --git a/src/Processors/QueryPlan/MergeSortingStep.cpp b/src/Processors/QueryPlan/MergeSortingStep.cpp index 529c505fc1c..fb263cbcca1 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.cpp +++ b/src/Processors/QueryPlan/MergeSortingStep.cpp @@ -45,6 +45,15 @@ MergeSortingStep::MergeSortingStep( : DataStream::SortMode::Port; } +void MergeSortingStep::updateLimit(size_t limit_) +{ + if (limit_ && (limit == 0 || limit_ < limit)) + { + limit = limit_; + transform_traits.preserves_number_of_rows = limit == 0; + } +} + void MergeSortingStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr diff --git a/src/Processors/QueryPlan/MergeSortingStep.h b/src/Processors/QueryPlan/MergeSortingStep.h index 0bbc066622e..a54ea7ac365 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.h +++ b/src/Processors/QueryPlan/MergeSortingStep.h @@ -27,6 +27,9 @@ public: void describeActions(FormatSettings & settings) const override; + /// Add limit or change it to lower value. + void updateLimit(size_t limit_); + private: SortDescription description; size_t max_merged_block_size; diff --git a/src/Processors/QueryPlan/MergingSortedStep.cpp b/src/Processors/QueryPlan/MergingSortedStep.cpp index 9fc92c9a4ae..3b767d91219 100644 --- a/src/Processors/QueryPlan/MergingSortedStep.cpp +++ b/src/Processors/QueryPlan/MergingSortedStep.cpp @@ -37,6 +37,15 @@ MergingSortedStep::MergingSortedStep( output_stream->sort_mode = DataStream::SortMode::Stream; } +void MergingSortedStep::updateLimit(size_t limit_) +{ + if (limit_ && (limit == 0 || limit_ < limit)) + { + limit = limit_; + transform_traits.preserves_number_of_rows = limit == 0; + } +} + void MergingSortedStep::transformPipeline(QueryPipeline & pipeline) { /// If there are several streams, then we merge them into one diff --git a/src/Processors/QueryPlan/MergingSortedStep.h b/src/Processors/QueryPlan/MergingSortedStep.h index a5b0c12b1fb..483cfa5e8a7 100644 --- a/src/Processors/QueryPlan/MergingSortedStep.h +++ b/src/Processors/QueryPlan/MergingSortedStep.h @@ -23,6 +23,9 @@ public: void describeActions(FormatSettings & settings) const override; + /// Add limit or change it to lower value. + void updateLimit(size_t limit_); + private: SortDescription sort_description; size_t max_block_size; diff --git a/src/Processors/QueryPlan/PartialSortingStep.cpp b/src/Processors/QueryPlan/PartialSortingStep.cpp index 3d7396f9956..50e25fe0e00 100644 --- a/src/Processors/QueryPlan/PartialSortingStep.cpp +++ b/src/Processors/QueryPlan/PartialSortingStep.cpp @@ -37,6 +37,15 @@ PartialSortingStep::PartialSortingStep( output_stream->sort_mode = DataStream::SortMode::Chunk; } +void PartialSortingStep::updateLimit(size_t limit_) +{ + if (limit_ && (limit == 0 || limit_ < limit)) + { + limit = limit_; + transform_traits.preserves_number_of_rows = limit == 0; + } +} + void PartialSortingStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr diff --git a/src/Processors/QueryPlan/PartialSortingStep.h b/src/Processors/QueryPlan/PartialSortingStep.h index 12e9cec961a..172ef25c300 100644 --- a/src/Processors/QueryPlan/PartialSortingStep.h +++ b/src/Processors/QueryPlan/PartialSortingStep.h @@ -22,6 +22,9 @@ public: void describeActions(FormatSettings & settings) const override; + /// Add limit or change it to lower value. + void updateLimit(size_t limit_); + private: SortDescription sort_description; UInt64 limit; diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 58d01f534d0..17ed11993c0 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -5,6 +5,10 @@ #include #include #include +#include "MergingSortedStep.h" +#include "FinishSortingStep.h" +#include "MergeSortingStep.h" +#include "PartialSortingStep.h" namespace DB { @@ -129,6 +133,7 @@ void QueryPlan::addStep(QueryPlanStepPtr step) QueryPipelinePtr QueryPlan::buildQueryPipeline() { checkInitialized(); + optimize(); struct Frame { @@ -308,8 +313,50 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio } } -static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child) +/// If plan looks like Limit -> Sorting, update limit for Sorting +bool tryUpdateLimitForSortingSteps(QueryPlan::Node * node, size_t limit) { + QueryPlanStepPtr & step = node->step; + QueryPlan::Node * child = nullptr; + bool updated = false; + + if (auto * merging_sorted = typeid_cast(step.get())) + { + /// TODO: remove LimitStep here. + merging_sorted->updateLimit(limit); + updated = true; + child = node->children.front(); + } + else if (auto * finish_sorting = typeid_cast(step.get())) + { + /// TODO: remove LimitStep here. + finish_sorting->updateLimit(limit); + updated = true; + } + else if (auto * merge_sorting = typeid_cast(step.get())) + { + merge_sorting->updateLimit(limit); + updated = true; + child = node->children.front(); + } + else if (auto * partial_sorting = typeid_cast(step.get())) + { + partial_sorting->updateLimit(limit); + updated = true; + } + + /// We often have chain PartialSorting -> MergeSorting -> MergingSorted + /// Try update limit for them also if possible. + if (child) + tryUpdateLimitForSortingSteps(child, limit); + + return updated; +} + +/// Move LimitStep down if possible. +static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_node) +{ + auto & child = child_node->step; const auto * limit = typeid_cast(parent.get()); if (!limit) @@ -321,6 +368,10 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child if (!transforming) return; + /// Special cases for sorting steps. + if (tryUpdateLimitForSortingSteps(child_node, limit->limitPlusOffset())) + return; + /// Now we should decide if pushing down limit possible for this step. const auto & transform_traits = transforming->getTransformTraits(); @@ -356,7 +407,7 @@ void QueryPlan::optimize() { /// First entrance, try push down. if (frame.node->children.size() == 1) - tryPushDownLimit(frame.node->step, frame.node->children.front()->step); + tryPushDownLimit(frame.node->step, frame.node->children.front()); } if (frame.next_child < frame.node->children.size()) diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index a933688ead7..17ff7dde3af 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -62,7 +62,6 @@ public: void addInterpreterContext(std::shared_ptr context); -private: /// Tree node. Step and it's children. struct Node { @@ -70,6 +69,8 @@ private: std::vector children = {}; }; +private: + using Nodes = std::list; Nodes nodes;