From cbf9304d1f91b3861bfb270da0b413be51fd375d Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Mon, 25 Dec 2023 14:16:51 +0300 Subject: [PATCH 1/2] MergeTree automatically derive do_not_merge_across_partitions_select_final setting --- .../QueryPlan/ReadFromMergeTree.cpp | 37 ++++++++++++++++++- src/Processors/QueryPlan/ReadFromMergeTree.h | 2 + 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 875b0d9bdbc..aa3e42a3a64 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1042,6 +1042,38 @@ static void addMergingFinal( pipe.addTransform(get_merging_processor()); } +bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const +{ + const auto & settings = context->getSettingsRef(); + + /// If setting do_not_merge_across_partitions_select_final is set always prefer it + if (settings.do_not_merge_across_partitions_select_final.changed) + return settings.do_not_merge_across_partitions_select_final; + + if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey()) + return false; + + /** To avoid merging parts across partitions we want result of partition key expression for + * rows with same primary key to be the same. + * + * If partition key expression is deterministic, and contains only columns that are included + * in primary key, then for same primary key column values, result of partition key expression + * will be the same. + */ + const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression; + if (partition_key_expression->getActionsDAG().hasNonDeterministic()) + return false; + + const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names; + NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end()); + + const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns(); + for (const auto & partition_key_required_column : partition_key_required_columns) + if (!primary_key_columns_set.contains(partition_key_required_column)) + return false; + + return true; +} Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection) @@ -1064,7 +1096,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( auto it = parts_with_ranges.begin(); parts_to_merge_ranges.push_back(it); - if (settings.do_not_merge_across_partitions_select_final) + bool do_not_merge_across_partitions_select_final = doNotMergePartsAcrossPartitionsFinal(); + if (do_not_merge_across_partitions_select_final) { while (it != parts_with_ranges.end()) { @@ -1097,7 +1130,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition /// with level > 0 then we won't post-process this part, and if num_streams > 1 we /// can use parallel select on such parts. - bool no_merging_final = settings.do_not_merge_across_partitions_select_final && + bool no_merging_final = do_not_merge_across_partitions_select_final && std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0; Pipes pipes; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 4e38e06c6af..9919352ef72 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -288,6 +288,8 @@ private: ActionsDAGPtr & out_projection, const InputOrderInfoPtr & input_order_info); + bool doNotMergePartsAcrossPartitionsFinal() const; + Pipe spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection); From 71921086ae048a1a4d07ada086b13f54f0dd1172 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 26 Dec 2023 12:40:54 +0300 Subject: [PATCH 2/2] Fixed tests --- tests/queries/0_stateless/02286_parallel_final.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/02286_parallel_final.sh b/tests/queries/0_stateless/02286_parallel_final.sh index de0cca0e966..788b4c0e9b5 100755 --- a/tests/queries/0_stateless/02286_parallel_final.sh +++ b/tests/queries/0_stateless/02286_parallel_final.sh @@ -11,7 +11,7 @@ test_random_values() { create table tbl_8parts_${layers}granules_rnd (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 % 8); insert into tbl_8parts_${layers}granules_rnd select number, 1 from numbers_mt($((layers * 8 * 8192))); optimize table tbl_8parts_${layers}granules_rnd final; - explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16;" 2>&1 | + explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16, do_not_merge_across_partitions_select_final = 0;;" 2>&1 | grep -c "CollapsingSortedTransform" } @@ -25,7 +25,7 @@ test_sequential_values() { create table tbl_8parts_${layers}granules_seq (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 / $((layers * 8192)))::UInt64; insert into tbl_8parts_${layers}granules_seq select number, 1 from numbers_mt($((layers * 8 * 8192))); optimize table tbl_8parts_${layers}granules_seq final; - explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8;" 2>&1 | + explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8, do_not_merge_across_partitions_select_final = 0;" 2>&1 | grep -c "CollapsingSortedTransform" }