diff --git a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp index ac7fcdcf83f..c41122c26b2 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp @@ -820,6 +820,8 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit); if (!can_read) return nullptr; + + reading->enableVirtualRow(); } return order_info; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 43b034b476a..7a297f6db3b 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1099,9 +1099,11 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( for (auto && item : splitted_parts_and_ranges) { - /// enable_virtual_row = true means a MergingSortedTransform should occur. - /// If so, adding a virtual row might speedup in the case of multiple parts. - enable_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1; + /// If not enabled before, try to enable it when conditions meet as in the following section of preliminary merge, + /// only ExpressionTransform is added between MergingSortedTransform and readFromMergeTree. + if (!enable_virtual_row) + enable_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1; + pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit)); } } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index b43217db598..ccb56c3f31a 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -210,6 +210,8 @@ public: void applyFilters(ActionDAGNodes added_filter_nodes) override; + void enableVirtualRow() { enable_virtual_row = true; } + private: int getSortDirection() const { diff --git a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference index 3c3a9cf532e..7106ddc157c 100644 --- a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference +++ b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.reference @@ -10,6 +10,18 @@ 16388 24576 ======== +0 +1 +2 +3 +16384 +======== +16385 +16386 +16387 +16388 +24578 +======== 1 2 1 2 1 3 diff --git a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql index 688e427d19d..5bae739bc51 100644 --- a/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql +++ b/tests/queries/0_stateless/03031_read_in_order_optimization_with_virtual_row.sql @@ -39,14 +39,14 @@ SETTINGS max_block_size = 8192, read_in_order_two_level_merge_threshold = 0, --force preliminary merge max_threads = 1, optimize_read_in_order = 1, -log_comment = 'no filter'; +log_comment = 'preliminary merge, no filter'; SYSTEM FLUSH LOGS; SELECT read_rows FROM system.query_log WHERE current_database = currentDatabase() -AND log_comment = 'no filter' +AND log_comment = 'preliminary merge, no filter' AND type = 'QueryFinish' ORDER BY query_start_time DESC limit 1; @@ -63,18 +63,68 @@ SETTINGS max_block_size = 8192, read_in_order_two_level_merge_threshold = 0, --force preliminary merge max_threads = 1, optimize_read_in_order = 1, -log_comment = 'with filter'; +log_comment = 'preliminary merge with filter'; SYSTEM FLUSH LOGS; SELECT read_rows FROM system.query_log WHERE current_database = currentDatabase() -AND log_comment = 'with filter' +AND log_comment = 'preliminary merge with filter' AND type = 'QueryFinish' ORDER BY query_start_time DESC LIMIT 1; +SELECT '========'; +-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192), +-- both chunks come from the same part. +SELECT x +FROM t +ORDER BY x ASC +LIMIT 4 +SETTINGS max_block_size = 8192, +read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge +max_threads = 1, +optimize_read_in_order = 1, +log_comment = 'no preliminary merge, no filter'; + +SYSTEM FLUSH LOGS; + +SELECT read_rows +FROM system.query_log +WHERE current_database = currentDatabase() +AND log_comment = 'no preliminary merge, no filter' +AND type = 'QueryFinish' +ORDER BY query_start_time DESC +LIMIT 1; + +SELECT '========'; +-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192), +-- all chunks come from the same part. +SELECT k +FROM t +WHERE k > 8192 * 2 +ORDER BY x ASC +LIMIT 4 +SETTINGS max_block_size = 8192, +read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge +read_in_order_use_buffering = false, --avoid buffer +max_threads = 1, +optimize_read_in_order = 1, +log_comment = 'no preliminary merge, with filter'; + +SYSTEM FLUSH LOGS; + +SELECT read_rows +FROM system.query_log +WHERE current_database = currentDatabase() +AND log_comment = 'no preliminary merge, with filter' +AND type = 'QueryFinish' +ORDER BY query_start_time DESC +LIMIT 1; + +DROP TABLE t; + SELECT '========'; -- from 02149_read_in_order_fixed_prefix DROP TABLE IF EXISTS fixed_prefix;