diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index dc210d7bc33..77559c52c4e 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1240,6 +1240,11 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (num_streams > settings.max_final_threads) num_streams = settings.max_final_threads; + /// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions. + /// We have all parts in parts vector, where parts with same partition are nerby. + /// So we will store iterators pointed to the beginning of each partition range (and parts.end()), + /// then we will create a pipe for each partition that will run selecting processor and merging processor + /// for the parts with this partition. In the end we will unite all the pipes. std::vector parts_to_merge_ranges; auto it = parts.begin(); parts_to_merge_ranges.push_back(it); @@ -1252,14 +1257,17 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); parts_to_merge_ranges.push_back(it); } + /// We divide threads for each partition equally. But we will create at least the number of partitions threads. + /// (So, the total number of threads could be more than initial num_streams. num_streams /= (parts_to_merge_ranges.size() - 1); } else { + /// If do_not_merge_across_partitions_select_final is false we just merge all the parts. parts_to_merge_ranges.push_back(parts.end()); } - Pipes select_and_merge_pipes; + Pipes partition_pipes; for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) { @@ -1270,7 +1278,6 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) { - LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id); auto source_processor = std::make_shared( data, metadata_snapshot, @@ -1297,9 +1304,13 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (!out_projection) out_projection = createProjection(pipe, data); - if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) + /// If do_not_merge_across_partitions_select_final is true, there is only one part in partition and it's level > 0 + /// then we won't merge this part. + if (data_settings->do_not_merge_across_partitions_select_final && + std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && + parts_to_merge_ranges[range_index]->data_part->info.level > 0) { - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); continue; } @@ -1359,7 +1370,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (num_streams <= 1 || sort_description.empty()) { pipe.addTransform(get_merging_processor()); - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); continue; } @@ -1413,10 +1424,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( return processors; }); - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); } - return Pipe::unitePipes(std::move(select_and_merge_pipes)); + return Pipe::unitePipes(std::move(partition_pipes)); } /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference new file mode 100644 index 00000000000..4c85a1d418a --- /dev/null +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference @@ -0,0 +1,6 @@ +2000-01-01 00:00:00 0 +2020-01-01 00:00:00 0 +2000-01-01 00:00:00 1 +2020-01-01 00:00:00 1 +2000-01-01 00:00:00 2 +2020-01-01 00:00:00 2 diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql new file mode 100644 index 00000000000..d670cf8594c --- /dev/null +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS select_final; + +CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1; + +INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2); +INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2); + +INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2); +INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2); + + +SELECT * FROM select_final FINAL ORDER BY x; + +DROP TABLE select_final; +