diff --git a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp index e7468a3a3f2..d3ecb3cac6b 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp @@ -94,6 +94,17 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & bac return nullptr; } +static bool checkVirtualRowSupport(const StepStack & backward_path) +{ + for (size_t i = 0; i < backward_path.size() - 1; i++) + { + IQueryPlanStep * step = backward_path[i]; + if (!typeid_cast(step) && !typeid_cast(step)) + return false; + } + return true; +} + void updateStepsDataStreams(StepStack & steps_to_update) { /// update data stream's sorting properties for found transforms @@ -825,8 +836,10 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n if (!can_read) return nullptr; - if (!order_info->first_prefix_fixed) - reading->enableVirtualRow(); + if (!checkVirtualRowSupport(backward_path)) + reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::No); + else if (!order_info->first_prefix_fixed) + reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::Possible); } return order_info; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 45dcb4616b1..2ac663e0680 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1109,17 +1109,18 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( } } - /// If enabled in the optimization stage, check whether there are more than one branch. - if (enable_virtual_row) - enable_virtual_row = splitted_parts_and_ranges.size() > 1 - || (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1); + /// If possible in the optimization stage, check whether there are more than one branch. + if (virtual_row_status == VirtualRowStatus::Possible) + virtual_row_status = splitted_parts_and_ranges.size() > 1 + || (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1) + ? VirtualRowStatus::Yes : VirtualRowStatus::NoConsiderInLogicalPlan; for (auto && item : splitted_parts_and_ranges) { - /// If not enabled before, try to enable it when conditions meet, as in the following section of preliminary merge, - /// only ExpressionTransform is added between MergingSortedTransform and readFromMergeTree. - bool enable_current_virtual_row = enable_virtual_row; - if (!enable_virtual_row) + bool enable_current_virtual_row = false; + if (virtual_row_status == VirtualRowStatus::Yes) + enable_current_virtual_row = true; + else if (virtual_row_status == VirtualRowStatus::NoConsiderInLogicalPlan) enable_current_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1; pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit, diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index aa1b9dcfdcb..767fcf3b0f8 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -108,6 +108,14 @@ public: using AnalysisResultPtr = std::shared_ptr; + enum class VirtualRowStatus + { + NoConsiderInLogicalPlan, + Possible, + No, + Yes, + }; + ReadFromMergeTree( MergeTreeData::DataPartsVector parts_, MergeTreeData::MutationsSnapshotPtr mutations_snapshot_, @@ -210,7 +218,7 @@ public: void applyFilters(ActionDAGNodes added_filter_nodes) override; - void enableVirtualRow() { enable_virtual_row = true; } + void setVirtualRowStatus(VirtualRowStatus virtual_row_status_) { virtual_row_status = virtual_row_status_; } private: int getSortDirection() const @@ -284,7 +292,9 @@ private: std::optional read_task_callback; bool enable_vertical_final = false; bool enable_remove_parts_from_snapshot_optimization = true; - bool enable_virtual_row = false; + + VirtualRowStatus virtual_row_status = VirtualRowStatus::NoConsiderInLogicalPlan; + std::optional number_of_current_replica; }; diff --git a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference index 08dabf3ee06..499ac19d374 100644 --- a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference +++ b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference @@ -38,3 +38,5 @@ 1 3 1 2 1 1 +-- test distinct ---- +0 diff --git a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql index 7e3af6c057a..4c7bc5d17c7 100644 --- a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql +++ b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql @@ -182,3 +182,35 @@ optimize_read_in_order = 1, read_in_order_two_level_merge_threshold = 0; --force preliminary merge DROP TABLE function_pk; + +-- modified from 02317_distinct_in_order_optimization +SELECT '-- test distinct ----'; + +DROP TABLE IF EXISTS distinct_in_order SYNC; + +CREATE TABLE distinct_in_order +( + `a` int, + `b` int, + `c` int +) +ENGINE = MergeTree +ORDER BY (a, b) +SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi'; + +SYSTEM STOP MERGES distinct_in_order; + +INSERT INTO distinct_in_order SELECT + number % number, + number % 5, + number % 10 +FROM numbers(1, 1000000); + +SELECT DISTINCT a +FROM distinct_in_order +ORDER BY a ASC +SETTINGS read_in_order_two_level_merge_threshold = 0, +optimize_read_in_order = 1, +max_threads = 2; + +DROP TABLE distinct_in_order;