From fb3a50024d44f9ce91dccabd72ff3666f9558796 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 27 Jul 2020 15:48:33 +0300 Subject: [PATCH] Added preserves_number_of_rows trait. --- .../QueryPlan/AddingDelayedSourceStep.cpp | 1 + src/Processors/QueryPlan/AggregatingStep.cpp | 1 + src/Processors/QueryPlan/ConvertingStep.cpp | 1 + src/Processors/QueryPlan/CreatingSetsStep.cpp | 1 + src/Processors/QueryPlan/CubeStep.cpp | 1 + src/Processors/QueryPlan/DistinctStep.cpp | 1 + src/Processors/QueryPlan/ExpressionStep.cpp | 1 + src/Processors/QueryPlan/ExtremesStep.cpp | 1 + src/Processors/QueryPlan/FillingStep.cpp | 1 + src/Processors/QueryPlan/FilterStep.cpp | 1 + src/Processors/QueryPlan/FinishSortingStep.cpp | 5 +++-- src/Processors/QueryPlan/ITransformingStep.cpp | 1 + src/Processors/QueryPlan/ITransformingStep.h | 8 ++++++++ src/Processors/QueryPlan/LimitByStep.cpp | 1 + src/Processors/QueryPlan/LimitStep.cpp | 1 + src/Processors/QueryPlan/MergeSortingStep.cpp | 5 +++-- src/Processors/QueryPlan/MergingAggregatedStep.cpp | 1 + src/Processors/QueryPlan/MergingSortedStep.cpp | 5 +++-- src/Processors/QueryPlan/OffsetStep.cpp | 1 + src/Processors/QueryPlan/PartialSortingStep.cpp | 5 +++-- src/Processors/QueryPlan/QueryPlan.cpp | 13 +++++++++++++ src/Processors/QueryPlan/RollupStep.cpp | 1 + src/Processors/QueryPlan/TotalsHavingStep.cpp | 5 +++-- 23 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/Processors/QueryPlan/AddingDelayedSourceStep.cpp b/src/Processors/QueryPlan/AddingDelayedSourceStep.cpp index 9326d7808ba..e5c84c4c881 100644 --- a/src/Processors/QueryPlan/AddingDelayedSourceStep.cpp +++ b/src/Processors/QueryPlan/AddingDelayedSourceStep.cpp @@ -11,6 +11,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, .returns_single_stream = false, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, /// New rows are added from delayed stream }; } diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index d8163cfa1ca..3083d0e0834 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/ConvertingStep.cpp b/src/Processors/QueryPlan/ConvertingStep.cpp index 4713a3c4402..8cc7bc0f179 100644 --- a/src/Processors/QueryPlan/ConvertingStep.cpp +++ b/src/Processors/QueryPlan/ConvertingStep.cpp @@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = true, }; } diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index a61900662c5..9194cef7c07 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = true, }; } diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index b9f826a5ac7..34c93a339c0 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -12,6 +12,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/DistinctStep.cpp b/src/Processors/QueryPlan/DistinctStep.cpp index 2a94754a6b0..84617ef8fdd 100644 --- a/src/Processors/QueryPlan/DistinctStep.cpp +++ b/src/Processors/QueryPlan/DistinctStep.cpp @@ -23,6 +23,7 @@ static ITransformingStep::DataStreamTraits getTraits(bool pre_distinct, bool alr .preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise .returns_single_stream = !pre_distinct && !already_distinct_columns, .preserves_number_of_streams = pre_distinct || already_distinct_columns, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 66dd5fed31a..fb26a0a3623 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -15,6 +15,7 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr .preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = !expression->hasJoinOrArrayJoin(), }; } diff --git a/src/Processors/QueryPlan/ExtremesStep.cpp b/src/Processors/QueryPlan/ExtremesStep.cpp index 15575f02776..685c4349aa2 100644 --- a/src/Processors/QueryPlan/ExtremesStep.cpp +++ b/src/Processors/QueryPlan/ExtremesStep.cpp @@ -11,6 +11,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = true, }; } diff --git a/src/Processors/QueryPlan/FillingStep.cpp b/src/Processors/QueryPlan/FillingStep.cpp index 2c06d14e6a5..7aa9acd76d0 100644 --- a/src/Processors/QueryPlan/FillingStep.cpp +++ b/src/Processors/QueryPlan/FillingStep.cpp @@ -18,6 +18,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later. .returns_single_stream = true, .preserves_number_of_streams = true, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 9abca58a5de..4b2437778c9 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr .preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index 85325e07a0c..f44398c400a 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -9,13 +9,14 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() +static ITransformingStep::DataStreamTraits getTraits(size_t limit) { return ITransformingStep::DataStreamTraits { .preserves_distinct_columns = true, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = limit == 0, }; } @@ -25,7 +26,7 @@ FinishSortingStep::FinishSortingStep( SortDescription result_description_, size_t max_block_size_, UInt64 limit_) - : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + : ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_)) , prefix_description(std::move(prefix_description_)) , result_description(std::move(result_description_)) , max_block_size(max_block_size_) diff --git a/src/Processors/QueryPlan/ITransformingStep.cpp b/src/Processors/QueryPlan/ITransformingStep.cpp index 9c67fedc734..0e0b0159194 100644 --- a/src/Processors/QueryPlan/ITransformingStep.cpp +++ b/src/Processors/QueryPlan/ITransformingStep.cpp @@ -6,6 +6,7 @@ namespace DB ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_) : collect_processors(collect_processors_) + , transform_traits(traits) { output_stream = DataStream{.header = std::move(output_header)}; diff --git a/src/Processors/QueryPlan/ITransformingStep.h b/src/Processors/QueryPlan/ITransformingStep.h index 30ff039cf39..d674ce7fdd6 100644 --- a/src/Processors/QueryPlan/ITransformingStep.h +++ b/src/Processors/QueryPlan/ITransformingStep.h @@ -23,6 +23,10 @@ public: /// Won't change the number of ports for pipeline. /// Examples: true for ExpressionStep, false for MergeSortingStep bool preserves_number_of_streams; + + /// Won't change the total number of rows. + /// Examples: true ExpressionStep (without join or array join), false for FilterStep + bool preserves_number_of_rows; }; ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_ = true); @@ -31,6 +35,8 @@ public: virtual void transformPipeline(QueryPipeline & pipeline) = 0; + const DataStreamTraits & getTransformTraits() const { return transform_traits; } + void describePipeline(FormatSettings & settings) const override; protected: @@ -41,6 +47,8 @@ private: /// We collect processors got after pipeline transformation. Processors processors; bool collect_processors; + + DataStreamTraits transform_traits; }; } diff --git a/src/Processors/QueryPlan/LimitByStep.cpp b/src/Processors/QueryPlan/LimitByStep.cpp index 30d22630786..b8d86e3e62f 100644 --- a/src/Processors/QueryPlan/LimitByStep.cpp +++ b/src/Processors/QueryPlan/LimitByStep.cpp @@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/LimitStep.cpp b/src/Processors/QueryPlan/LimitStep.cpp index 55447535f81..903c9fb076c 100644 --- a/src/Processors/QueryPlan/LimitStep.cpp +++ b/src/Processors/QueryPlan/LimitStep.cpp @@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/MergeSortingStep.cpp b/src/Processors/QueryPlan/MergeSortingStep.cpp index 410d68d3c01..2865624a83f 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.cpp +++ b/src/Processors/QueryPlan/MergeSortingStep.cpp @@ -6,13 +6,14 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() +static ITransformingStep::DataStreamTraits getTraits(size_t limit) { return ITransformingStep::DataStreamTraits { .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = limit == 0, }; } @@ -25,7 +26,7 @@ MergeSortingStep::MergeSortingStep( size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t min_free_disk_space_) - : ITransformingStep(input_stream, input_stream.header, getTraits()) + : ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) , description(description_) , max_merged_block_size(max_merged_block_size_) , limit(limit_) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 7e9d073e186..3f88f5151da 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/MergingSortedStep.cpp b/src/Processors/QueryPlan/MergingSortedStep.cpp index 50bef82910c..7aaa2ce2922 100644 --- a/src/Processors/QueryPlan/MergingSortedStep.cpp +++ b/src/Processors/QueryPlan/MergingSortedStep.cpp @@ -6,13 +6,14 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() +static ITransformingStep::DataStreamTraits getTraits(size_t limit) { return ITransformingStep::DataStreamTraits { .preserves_distinct_columns = true, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = limit == 0, }; } @@ -21,7 +22,7 @@ MergingSortedStep::MergingSortedStep( SortDescription sort_description_, size_t max_block_size_, UInt64 limit_) - : ITransformingStep(input_stream, input_stream.header, getTraits()) + : ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) , sort_description(std::move(sort_description_)) , max_block_size(max_block_size_) , limit(limit_) diff --git a/src/Processors/QueryPlan/OffsetStep.cpp b/src/Processors/QueryPlan/OffsetStep.cpp index 79ccad26686..c79f7679040 100644 --- a/src/Processors/QueryPlan/OffsetStep.cpp +++ b/src/Processors/QueryPlan/OffsetStep.cpp @@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/PartialSortingStep.cpp b/src/Processors/QueryPlan/PartialSortingStep.cpp index 3d602169087..0805fd0f713 100644 --- a/src/Processors/QueryPlan/PartialSortingStep.cpp +++ b/src/Processors/QueryPlan/PartialSortingStep.cpp @@ -7,13 +7,14 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() +static ITransformingStep::DataStreamTraits getTraits(size_t limit) { return ITransformingStep::DataStreamTraits { .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, + .preserves_number_of_rows = limit == 0, }; } @@ -22,7 +23,7 @@ PartialSortingStep::PartialSortingStep( SortDescription sort_description_, UInt64 limit_, SizeLimits size_limits_) - : ITransformingStep(input_stream, input_stream.header, getTraits()) + : ITransformingStep(input_stream, input_stream.header, getTraits(limit_)) , sort_description(std::move(sort_description_)) , limit(limit_) , size_limits(size_limits_) diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 3bc8ca1f994..0c02c9ce57d 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -315,7 +315,20 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child if (!limit) return; + const auto * transforming = typeid_cast(child.get()); + + /// Skip everything which is not transform. + if (!transforming) + return; + /// Now we should decide if pushing down limit possible for this step. + + /// Cannot push down if child changes the number of rows. + if (!transforming->getTransformTraits().preserves_number_of_rows) + return; + + + /// ExtremesStep ? , FinishSorting, MergeSorting, MergingSorted, PartialSorting } void QueryPlan::optimize() diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 593744f0253..2a73c247822 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -12,6 +12,7 @@ static ITransformingStep::DataStreamTraits getTraits() .preserves_distinct_columns = false, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = false, }; } diff --git a/src/Processors/QueryPlan/TotalsHavingStep.cpp b/src/Processors/QueryPlan/TotalsHavingStep.cpp index aa1ee0bc49c..e5c650c5001 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.cpp +++ b/src/Processors/QueryPlan/TotalsHavingStep.cpp @@ -8,13 +8,14 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() +static ITransformingStep::DataStreamTraits getTraits(bool has_filter) { return ITransformingStep::DataStreamTraits { .preserves_distinct_columns = true, .returns_single_stream = true, .preserves_number_of_streams = false, + .preserves_number_of_rows = !has_filter, }; } @@ -29,7 +30,7 @@ TotalsHavingStep::TotalsHavingStep( : ITransformingStep( input_stream_, TotalsHavingTransform::transformHeader(input_stream_.header, expression_, final_), - getTraits()) + getTraits(!filter_column_.empty())) , overflow_row(overflow_row_) , expression(expression_) , filter_column_name(filter_column_)