Merge pull request #58218 from kitaisreal/merge-tree-automatically-derive-do-not-merge-across-partitions-select-final-setting

MergeTree derive do_not_merge_across_partitions_select_final setting
This commit is contained in:
Alexey Milovidov 2023-12-27 13:26:17 +01:00 committed by GitHub
commit 77ec4a0422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 4 deletions

View File

@ -1042,6 +1042,38 @@ static void addMergingFinal(
pipe.addTransform(get_merging_processor());
}
bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
{
const auto & settings = context->getSettingsRef();
/// If setting do_not_merge_across_partitions_select_final is set always prefer it
if (settings.do_not_merge_across_partitions_select_final.changed)
return settings.do_not_merge_across_partitions_select_final;
if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey())
return false;
/** To avoid merging parts across partitions we want result of partition key expression for
* rows with same primary key to be the same.
*
* If partition key expression is deterministic, and contains only columns that are included
* in primary key, then for same primary key column values, result of partition key expression
* will be the same.
*/
const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression;
if (partition_key_expression->getActionsDAG().hasNonDeterministic())
return false;
const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names;
NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end());
const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns();
for (const auto & partition_key_required_column : partition_key_required_columns)
if (!primary_key_columns_set.contains(partition_key_required_column))
return false;
return true;
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection)
@ -1064,7 +1096,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
auto it = parts_with_ranges.begin();
parts_to_merge_ranges.push_back(it);
if (settings.do_not_merge_across_partitions_select_final)
bool do_not_merge_across_partitions_select_final = doNotMergePartsAcrossPartitionsFinal();
if (do_not_merge_across_partitions_select_final)
{
while (it != parts_with_ranges.end())
{
@ -1097,7 +1130,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't post-process this part, and if num_streams > 1 we
/// can use parallel select on such parts.
bool no_merging_final = settings.do_not_merge_across_partitions_select_final &&
bool no_merging_final = do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0;
Pipes pipes;

View File

@ -289,6 +289,8 @@ private:
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info);
bool doNotMergePartsAcrossPartitionsFinal() const;
Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection);

View File

@ -11,7 +11,7 @@ test_random_values() {
create table tbl_8parts_${layers}granules_rnd (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 % 8);
insert into tbl_8parts_${layers}granules_rnd select number, 1 from numbers_mt($((layers * 8 * 8192)));
optimize table tbl_8parts_${layers}granules_rnd final;
explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16;" 2>&1 |
explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16, do_not_merge_across_partitions_select_final = 0;;" 2>&1 |
grep -c "CollapsingSortedTransform"
}
@ -25,7 +25,7 @@ test_sequential_values() {
create table tbl_8parts_${layers}granules_seq (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 / $((layers * 8192)))::UInt64;
insert into tbl_8parts_${layers}granules_seq select number, 1 from numbers_mt($((layers * 8 * 8192)));
optimize table tbl_8parts_${layers}granules_seq final;
explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8;" 2>&1 |
explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8, do_not_merge_across_partitions_select_final = 0;" 2>&1 |
grep -c "CollapsingSortedTransform"
}