From b161127fc1223c40d5225f7deac594dc4bc95179 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 3 Nov 2020 21:19:50 +0300 Subject: [PATCH 1/2] Add a test for spreading parts between threads for ReadInOrderOptimizer --- ...1551_mergetree_read_in_order_spread.reference | 11 +++++++++++ .../01551_mergetree_read_in_order_spread.sql | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference create mode 100644 tests/queries/0_stateless/01551_mergetree_read_in_order_spread.sql diff --git a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference new file mode 100644 index 00000000000..fc10b4707a9 --- /dev/null +++ b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference @@ -0,0 +1,11 @@ +(Expression) +ExpressionTransform + (Expression) + ExpressionTransform + (Aggregating) + FinalizingSimpleTransform + AggregatingSortedTransform 3 → 1 + AggregatingInOrderTransform × 3 + (Expression) + ExpressionTransform × 3 + (ReadFromStorage) diff --git a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.sql b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.sql new file mode 100644 index 00000000000..831a7282861 --- /dev/null +++ b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS data_01551; + +CREATE TABLE data_01551 +( + key UInt32 +) engine=AggregatingMergeTree() +PARTITION BY key%2 +ORDER BY (key, key/2) +SETTINGS index_granularity=10; + +INSERT INTO data_01551 SELECT number FROM numbers(100000); +SET max_threads=3; +SET merge_tree_min_rows_for_concurrent_read=10000; +SET optimize_aggregation_in_order=1; +SET read_in_order_two_level_merge_threshold=1; +EXPLAIN PIPELINE SELECT key FROM data_01551 GROUP BY key, key/2; From 2389406c21848d07da7f2fc670a24612c018f6e4 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 3 Nov 2020 21:19:50 +0300 Subject: [PATCH 2/2] Fix spreading for ReadInOrderOptimizer with expression in ORDER BY This will fix optimize_read_in_order/optimize_aggregation_in_order with max_threads>0 and expression in ORDER BY --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 56 ++++++++++++++----- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f06bfb97b2c..a38d50e56fb 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -82,6 +82,17 @@ static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_part")}; } +/// Check if ORDER BY clause of the query has some expression. +static bool sortingDescriptionHasExpressions(const SortDescription & sort_description, const StorageMetadataPtr & metadata_snapshot) +{ + auto all_columns = metadata_snapshot->getColumns(); + for (const auto & sort_column : sort_description) + { + if (!all_columns.has(sort_column.column_name)) + return true; + } + return false; +} size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( const MergeTreeData::DataPartsVector & parts, @@ -1065,6 +1076,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold); + size_t max_output_ports = 0; for (size_t i = 0; i < num_streams && !parts.empty(); ++i) { @@ -1174,25 +1186,43 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( }); } - if (pipe.numOutputPorts() > 1 && need_preliminary_merge) + max_output_ports = std::max(pipe.numOutputPorts(), max_output_ports); + res.emplace_back(std::move(pipe)); + } + + if (need_preliminary_merge) + { + /// If ORDER BY clause of the query contains some expression, + /// then those new columns should be added for the merge step, + /// and this should be done always, if there is at least one pipe that + /// has multiple output ports. + bool sorting_key_has_expression = sortingDescriptionHasExpressions(input_order_info->order_key_prefix_descr, metadata_snapshot); + bool force_sorting_key_transform = res.size() > 1 && max_output_ports > 1 && sorting_key_has_expression; + + for (auto & pipe : res) { SortDescription sort_description; - for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) - sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], - input_order_info->direction, 1); - /// Drop temporary columns, added by 'sorting_key_prefix_expr' - out_projection = createProjection(pipe, data); - pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header) + if (pipe.numOutputPorts() > 1 || force_sorting_key_transform) { - return std::make_shared(header, sorting_key_prefix_expr); - }); + for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) + sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], + input_order_info->direction, 1); - pipe.addTransform(std::make_shared( - pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size)); + /// Drop temporary columns, added by 'sorting_key_prefix_expr' + out_projection = createProjection(pipe, data); + pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header) + { + return std::make_shared(header, sorting_key_prefix_expr); + }); + } + + if (pipe.numOutputPorts() > 1) + { + pipe.addTransform(std::make_shared( + pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size)); + } } - - res.emplace_back(std::move(pipe)); } return Pipe::unitePipes(std::move(res));