diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index a608c6f8058..df5c13a7f3b 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -16,6 +16,11 @@ using Processors = std::vector; namespace JSONBuilder { class JSONMap; } +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + /// Description of data stream. /// Single logical data stream may relate to many ports of pipeline. class DataStream @@ -107,7 +112,30 @@ public: /// Append extra processors for this step. void appendExtraProcessors(const Processors & extra_processors); + /// Updates the input streams of the given step. Used during query plan optimizations. + /// It won't do any validation of new streams, so it is your responsibility to ensure that this update doesn't break anything + /// (e.g. you update data stream traits or correctly remove / add columns). + void updateInputStreams(DataStreams input_streams_) + { + chassert(canUpdateInputStream()); + input_streams = std::move(input_streams_); + updateOutputStream(); + } + + void updateInputStream(DataStream input_stream) { updateInputStreams(DataStreams{input_stream}); } + + void updateInputStream(DataStream input_stream, size_t idx) + { + chassert(canUpdateInputStream() && idx < input_streams.size()); + input_streams[idx] = input_stream; + updateOutputStream(); + } + + virtual bool canUpdateInputStream() const { return false; } + protected: + virtual void updateOutputStream() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } + DataStreams input_streams; std::optional output_stream; diff --git a/src/Processors/QueryPlan/ITransformingStep.h b/src/Processors/QueryPlan/ITransformingStep.h index 77de668fbdb..32bf3b6af90 100644 --- a/src/Processors/QueryPlan/ITransformingStep.h +++ b/src/Processors/QueryPlan/ITransformingStep.h @@ -55,17 +55,6 @@ public: const TransformTraits & getTransformTraits() const { return transform_traits; } const DataStreamTraits & getDataStreamTraits() const { return data_stream_traits; } - /// Updates the input stream of the given step. Used during query plan optimizations. - /// It won't do any validation of a new stream, so it is your responsibility to ensure that this update doesn't break anything - /// (e.g. you update data stream traits or correctly remove / add columns). - void updateInputStream(DataStream input_stream) - { - input_streams.clear(); - input_streams.emplace_back(std::move(input_stream)); - - updateOutputStream(); - } - void describePipeline(FormatSettings & settings) const override; /// Enforcement is supposed to be done through the special settings that will be taken into account by remote nodes during query planning (e.g. force_aggregation_in_order). @@ -75,6 +64,8 @@ public: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } + bool canUpdateInputStream() const override { return true; } + protected: /// Create output stream from header and traits. static DataStream createOutputStream( @@ -85,8 +76,6 @@ protected: TransformTraits transform_traits; private: - virtual void updateOutputStream() = 0; - /// If we should collect processors got after pipeline transformation. bool collect_processors; diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp index 63a5eeb51d2..9ac0f18d2c3 100644 --- a/src/Processors/QueryPlan/JoinStep.cpp +++ b/src/Processors/QueryPlan/JoinStep.cpp @@ -24,11 +24,7 @@ JoinStep::JoinStep( bool keep_left_read_in_order_) : join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_) { - input_streams = {left_stream_, right_stream_}; - output_stream = DataStream - { - .header = JoiningTransform::transformHeader(left_stream_.header, join), - }; + updateInputStreams(DataStreams{left_stream_, right_stream_}); } QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) @@ -95,20 +91,12 @@ void JoinStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Clauses", table_join.formatClauses(table_join.getClauses(), true /*short_format*/)); } -void JoinStep::updateInputStream(const DataStream & new_input_stream_, size_t idx) +void JoinStep::updateOutputStream() { - if (idx == 0) + output_stream = DataStream { - input_streams = {new_input_stream_, input_streams.at(1)}; - output_stream = DataStream - { - .header = JoiningTransform::transformHeader(new_input_stream_.header, join), - }; - } - else - { - input_streams = {input_streams.at(0), new_input_stream_}; - } + .header = JoiningTransform::transformHeader(input_streams[0].header, join), + }; } static ITransformingStep::Traits getStorageJoinTraits() diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h index 369ee9bec8b..5bfaa41f9b6 100644 --- a/src/Processors/QueryPlan/JoinStep.h +++ b/src/Processors/QueryPlan/JoinStep.h @@ -33,9 +33,11 @@ public: const JoinPtr & getJoin() const { return join; } bool allowPushDownToRight() const; - void updateInputStream(const DataStream & new_input_stream_, size_t idx); + bool canUpdateInputStream() const override { return true; } private: + void updateOutputStream() override; + JoinPtr join; size_t max_block_size; size_t max_streams; diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 5d38bfb42c4..ec82c233ce4 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -460,16 +460,24 @@ static void updateDataStreams(QueryPlan::Node & root) static bool visitTopDownImpl(QueryPlan::Node * /*current_node*/, QueryPlan::Node * /*parent_node*/) { return true; } - static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * parent_node) + static void visitBottomUpImpl(QueryPlan::Node * current_node, QueryPlan::Node * /*parent_node*/) { - if (!parent_node || parent_node->children.size() != 1) + auto & current_step = *current_node->step; + if (!current_step.canUpdateInputStream() || current_node->children.empty()) return; - if (!current_node->step->hasOutputStream()) - return; + for (const auto * child : current_node->children) + { + if (!child->step->hasOutputStream()) + return; + } - if (auto * parent_transform_step = dynamic_cast(parent_node->step.get()); parent_transform_step) - parent_transform_step->updateInputStream(current_node->step->getOutputStream()); + DataStreams streams; + streams.reserve(current_node->children.size()); + for (const auto * child : current_node->children) + streams.emplace_back(child->step->getOutputStream()); + + current_step.updateInputStreams(std::move(streams)); } }; diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index e111890a833..dde12271de1 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -30,18 +30,16 @@ UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_) : header(checkHeaders(input_streams_)) , max_threads(max_threads_) { - input_streams = std::move(input_streams_); + updateInputStreams(std::move(input_streams_)); +} +void UnionStep::updateOutputStream() +{ if (input_streams.size() == 1) output_stream = input_streams.front(); else output_stream = DataStream{.header = header}; - updateOutputSortDescription(); -} - -void UnionStep::updateOutputSortDescription() -{ SortDescription common_sort_description = input_streams.front().sort_description; DataStream::SortScope sort_scope = input_streams.front().sort_scope; for (const auto & input_stream : input_streams) diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index 6278de07673..4ab08785b01 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -19,9 +19,11 @@ public: size_t getMaxThreads() const { return max_threads; } - void updateOutputSortDescription(); + bool canUpdateInputStream() const override { return true; } private: + void updateOutputStream() override; + Block header; size_t max_threads; }; diff --git a/tests/queries/0_stateless/02876_sort_union_of_sorted.reference b/tests/queries/0_stateless/02876_sort_union_of_sorted.reference new file mode 100644 index 00000000000..f3af221a036 --- /dev/null +++ b/tests/queries/0_stateless/02876_sort_union_of_sorted.reference @@ -0,0 +1,63 @@ +1..20: +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +20..1: +20 +19 +18 +17 +16 +15 +14 +13 +12 +11 +10 +9 +8 +7 +6 +5 +4 +3 +2 +1 +20..1: +20 +19 +18 +17 +16 +15 +14 +13 +12 +11 +10 +9 +8 +7 +6 +5 +4 +3 +2 +1 diff --git a/tests/queries/0_stateless/02876_sort_union_of_sorted.sql b/tests/queries/0_stateless/02876_sort_union_of_sorted.sql new file mode 100644 index 00000000000..23d3772bc82 --- /dev/null +++ b/tests/queries/0_stateless/02876_sort_union_of_sorted.sql @@ -0,0 +1,20 @@ +DROP TABLE IF EXISTS table1; +DROP TABLE IF EXISTS table2; + +CREATE TABLE table1 (number UInt64) ENGINE=MergeTree ORDER BY tuple(); +CREATE TABLE table2 (number UInt64) ENGINE=MergeTree ORDER BY tuple(); + +INSERT INTO table1 SELECT number FROM numbers_mt(1, 10); +INSERT INTO table2 SELECT number FROM numbers_mt(11, 10); + +SELECT '1..20:'; +SELECT * FROM ((SELECT * FROM table1 ORDER BY number) UNION ALL (SELECT * FROM table2 ORDER BY number)) ORDER BY number; + +SELECT '20..1:'; +SELECT * FROM ((SELECT * FROM table1 ORDER BY number) UNION ALL (SELECT * FROM table2 ORDER BY number)) ORDER BY number DESC; + +SELECT '20..1:'; +SELECT * FROM ((SELECT * FROM table1 ORDER BY number DESC) UNION ALL (SELECT * FROM table2 ORDER BY number DESC)) ORDER BY number DESC; + +DROP TABLE table1; +DROP TABLE table2;