From 8200bab8592c2b8ea9b5922c5168d805f5304346 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 13 Oct 2020 17:54:52 +0300 Subject: [PATCH 01/10] Add setting do_not_merge_across_partitions --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 288 ++++++++++-------- src/Storages/MergeTree/MergeTreeSettings.h | 2 + 2 files changed, 167 insertions(+), 123 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 8c1dc845d26..dc210d7bc33 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1237,144 +1237,186 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - Pipe pipe; - - { - Pipes pipes; - - for (const auto & part : parts) - { - auto source_processor = std::make_shared( - data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes, - settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, - use_uncompressed_cache, - query_info.prewhere_info, true, reader_settings, - virt_columns, part.part_index_in_query); - - pipes.emplace_back(std::move(source_processor)); - } - - pipe = Pipe::unitePipes(std::move(pipes)); - } - - /// Drop temporary columns, added by 'sorting_key_expr' - if (!out_projection) - out_projection = createProjection(pipe, data); - - pipe.addSimpleTransform([&metadata_snapshot](const Block & header) - { - return std::make_shared(header, metadata_snapshot->getSortingKey().expression); - }); - - Names sort_columns = metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; - - Block header = pipe.getHeader(); - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - - auto get_merging_processor = [&]() -> MergingTransformPtr - { - switch (data.merging_params.mode) - { - case MergeTreeData::MergingParams::Ordinary: - { - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, max_block_size); - } - - case MergeTreeData::MergingParams::Collapsing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.sign_column, true, max_block_size); - - case MergeTreeData::MergingParams::Summing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size); - - case MergeTreeData::MergingParams::Aggregating: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, max_block_size); - - case MergeTreeData::MergingParams::Replacing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.version_column, max_block_size); - - case MergeTreeData::MergingParams::VersionedCollapsing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.sign_column, max_block_size); - - case MergeTreeData::MergingParams::Graphite: - throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); - } - - __builtin_unreachable(); - }; - if (num_streams > settings.max_final_threads) num_streams = settings.max_final_threads; - if (num_streams <= 1 || sort_description.empty()) + std::vector parts_to_merge_ranges; + auto it = parts.begin(); + parts_to_merge_ranges.push_back(it); + + if (data_settings->do_not_merge_across_partitions_select_final) { - pipe.addTransform(get_merging_processor()); - return pipe; - } - - ColumnNumbers key_columns; - key_columns.reserve(sort_description.size()); - - for (auto & desc : sort_description) - { - if (!desc.column_name.empty()) - key_columns.push_back(header.getPositionByName(desc.column_name)); - else - key_columns.emplace_back(desc.column_number); - } - - pipe.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, num_streams, key_columns); - }); - - pipe.transform([&](OutputPortRawPtrs ports) - { - Processors processors; - std::vector output_ports; - processors.reserve(ports.size() + num_streams); - output_ports.reserve(ports.size()); - - for (auto & port : ports) + while (it != parts.end()) { - auto copier = std::make_shared(header, num_streams); - connect(*port, copier->getInputPort()); - output_ports.emplace_back(copier->getOutputs().begin()); - processors.emplace_back(std::move(copier)); + it = std::find_if( + it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); + parts_to_merge_ranges.push_back(it); } + num_streams /= (parts_to_merge_ranges.size() - 1); + } + else + { + parts_to_merge_ranges.push_back(parts.end()); + } + + Pipes select_and_merge_pipes; + + for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) + { + Pipe pipe; - for (size_t i = 0; i < num_streams; ++i) { - auto merge = get_merging_processor(); - merge->setSelectorPosition(i); - auto input = merge->getInputs().begin(); + Pipes pipes; - /// Connect i-th merge with i-th input port of every copier. - for (size_t j = 0; j < ports.size(); ++j) + for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) { - connect(*output_ports[j], *input); - ++output_ports[j]; - ++input; + LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id); + auto source_processor = std::make_shared( + data, + metadata_snapshot, + part_it->data_part, + max_block_size, + settings.preferred_block_size_bytes, + settings.preferred_max_column_in_block_size_bytes, + column_names, + part_it->ranges, + use_uncompressed_cache, + query_info.prewhere_info, + true, + reader_settings, + virt_columns, + part_it->part_index_in_query); + + pipes.emplace_back(std::move(source_processor)); } - processors.emplace_back(std::move(merge)); + pipe = Pipe::unitePipes(std::move(pipes)); } - return processors; - }); + /// Drop temporary columns, added by 'sorting_key_expr' + if (!out_projection) + out_projection = createProjection(pipe, data); - return pipe; + if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) + { + select_and_merge_pipes.emplace_back(std::move(pipe)); + continue; + } + + pipe.addSimpleTransform([&metadata_snapshot](const Block & header) { + return std::make_shared(header, metadata_snapshot->getSortingKey().expression); + }); + + Names sort_columns = metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + + Block header = pipe.getHeader(); + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + + auto get_merging_processor = [&]() -> MergingTransformPtr { + switch (data.merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: { + return std::make_shared(header, pipe.numOutputPorts(), sort_description, max_block_size); + } + + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, true, max_block_size); + + case MergeTreeData::MergingParams::Summing: + return std::make_shared( + header, + pipe.numOutputPorts(), + sort_description, + data.merging_params.columns_to_sum, + partition_key_columns, + max_block_size); + + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(header, pipe.numOutputPorts(), sort_description, max_block_size); + + case MergeTreeData::MergingParams::Replacing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.version_column, max_block_size); + + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, max_block_size); + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + } + + __builtin_unreachable(); + }; + + if (num_streams <= 1 || sort_description.empty()) + { + pipe.addTransform(get_merging_processor()); + select_and_merge_pipes.emplace_back(std::move(pipe)); + continue; + } + + ColumnNumbers key_columns; + key_columns.reserve(sort_description.size()); + + for (auto & desc : sort_description) + { + if (!desc.column_name.empty()) + key_columns.push_back(header.getPositionByName(desc.column_name)); + else + key_columns.emplace_back(desc.column_number); + } + + pipe.addSimpleTransform([&](const Block & stream_header) { + return std::make_shared(stream_header, num_streams, key_columns); + }); + + pipe.transform([&](OutputPortRawPtrs ports) { + Processors processors; + std::vector output_ports; + processors.reserve(ports.size() + num_streams); + output_ports.reserve(ports.size()); + + LOG_DEBUG(log, "Output ports size: {}", ports.size()); + + for (auto & port : ports) + { + auto copier = std::make_shared(header, num_streams); + connect(*port, copier->getInputPort()); + output_ports.emplace_back(copier->getOutputs().begin()); + processors.emplace_back(std::move(copier)); + } + + for (size_t i = 0; i < num_streams; ++i) + { + auto merge = get_merging_processor(); + merge->setSelectorPosition(i); + auto input = merge->getInputs().begin(); + + /// Connect i-th merge with i-th input port of every copier. + for (size_t j = 0; j < ports.size(); ++j) + { + connect(*output_ports[j], *input); + ++output_ports[j]; + ++input; + } + + processors.emplace_back(std::move(merge)); + } + + return processors; + }); + select_and_merge_pipes.emplace_back(std::move(pipe)); + } + + return Pipe::unitePipes(std::move(select_and_merge_pipes)); } /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 97bc73caf5b..a9bd4248259 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -112,6 +112,8 @@ struct Settings; /** Obsolete settings. Kept for backward compatibility only. */ \ M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \ M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \ + /** Select settings */ \ + M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \ /// Settings that should not change after the creation of a table. #define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \ From be0cb31d211a7f98bc765720ece209cacdfd3030 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 13 Oct 2020 21:55:03 +0300 Subject: [PATCH 02/10] Add tests and comments --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 25 +++++++++++++------ ...e_across_partitions_select_final.reference | 6 +++++ ...t_merge_across_partitions_select_final.sql | 15 +++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) create mode 100644 tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference create mode 100644 tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index dc210d7bc33..77559c52c4e 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1240,6 +1240,11 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (num_streams > settings.max_final_threads) num_streams = settings.max_final_threads; + /// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions. + /// We have all parts in parts vector, where parts with same partition are nerby. + /// So we will store iterators pointed to the beginning of each partition range (and parts.end()), + /// then we will create a pipe for each partition that will run selecting processor and merging processor + /// for the parts with this partition. In the end we will unite all the pipes. std::vector parts_to_merge_ranges; auto it = parts.begin(); parts_to_merge_ranges.push_back(it); @@ -1252,14 +1257,17 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); parts_to_merge_ranges.push_back(it); } + /// We divide threads for each partition equally. But we will create at least the number of partitions threads. + /// (So, the total number of threads could be more than initial num_streams. num_streams /= (parts_to_merge_ranges.size() - 1); } else { + /// If do_not_merge_across_partitions_select_final is false we just merge all the parts. parts_to_merge_ranges.push_back(parts.end()); } - Pipes select_and_merge_pipes; + Pipes partition_pipes; for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) { @@ -1270,7 +1278,6 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) { - LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id); auto source_processor = std::make_shared( data, metadata_snapshot, @@ -1297,9 +1304,13 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (!out_projection) out_projection = createProjection(pipe, data); - if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) + /// If do_not_merge_across_partitions_select_final is true, there is only one part in partition and it's level > 0 + /// then we won't merge this part. + if (data_settings->do_not_merge_across_partitions_select_final && + std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && + parts_to_merge_ranges[range_index]->data_part->info.level > 0) { - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); continue; } @@ -1359,7 +1370,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (num_streams <= 1 || sort_description.empty()) { pipe.addTransform(get_merging_processor()); - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); continue; } @@ -1413,10 +1424,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( return processors; }); - select_and_merge_pipes.emplace_back(std::move(pipe)); + partition_pipes.emplace_back(std::move(pipe)); } - return Pipe::unitePipes(std::move(select_and_merge_pipes)); + return Pipe::unitePipes(std::move(partition_pipes)); } /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference new file mode 100644 index 00000000000..4c85a1d418a --- /dev/null +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.reference @@ -0,0 +1,6 @@ +2000-01-01 00:00:00 0 +2020-01-01 00:00:00 0 +2000-01-01 00:00:00 1 +2020-01-01 00:00:00 1 +2000-01-01 00:00:00 2 +2020-01-01 00:00:00 2 diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql new file mode 100644 index 00000000000..d670cf8594c --- /dev/null +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS select_final; + +CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1; + +INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2); +INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2); + +INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2); +INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2); + + +SELECT * FROM select_final FINAL ORDER BY x; + +DROP TABLE select_final; + From 44c2b138f32a732f60730bcaebf19c60dbe9e01f Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 13 Oct 2020 22:53:36 +0300 Subject: [PATCH 03/10] Fix style --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 77559c52c4e..027f982ac24 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1314,7 +1314,8 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( continue; } - pipe.addSimpleTransform([&metadata_snapshot](const Block & header) { + pipe.addSimpleTransform([&metadata_snapshot](const Block & header) + { return std::make_shared(header, metadata_snapshot->getSortingKey().expression); }); @@ -1329,7 +1330,8 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - auto get_merging_processor = [&]() -> MergingTransformPtr { + auto get_merging_processor = [&]() -> MergingTransformPtr + { switch (data.merging_params.mode) { case MergeTreeData::MergingParams::Ordinary: { @@ -1385,11 +1387,13 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( key_columns.emplace_back(desc.column_number); } - pipe.addSimpleTransform([&](const Block & stream_header) { + pipe.addSimpleTransform([&](const Block & stream_header) + { return std::make_shared(stream_header, num_streams, key_columns); }); - pipe.transform([&](OutputPortRawPtrs ports) { + pipe.transform([&](OutputPortRawPtrs ports) + { Processors processors; std::vector output_ports; processors.reserve(ports.size() + num_streams); From f5fac575f4825755da549567d8bbca4e16a1693b Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Thu, 15 Oct 2020 15:22:41 +0300 Subject: [PATCH 04/10] don't postprocess single parts --- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 027f982ac24..faa9b80e3c8 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1304,11 +1304,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (!out_projection) out_projection = createProjection(pipe, data); - /// If do_not_merge_across_partitions_select_final is true, there is only one part in partition and it's level > 0 - /// then we won't merge this part. + /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition + /// then we won't postprocess this part if (data_settings->do_not_merge_across_partitions_select_final && - std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && - parts_to_merge_ranges[range_index]->data_part->info.level > 0) + std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1) { partition_pipes.emplace_back(std::move(pipe)); continue; From 89fdeb4e15ba8f4900b2dd07dd61fb2df727f6e5 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Wed, 21 Oct 2020 20:35:31 +0300 Subject: [PATCH 05/10] Fix style, move setting and add checking level>0 --- src/Core/Settings.h | 4 +++- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 14 +++++++------- src/Storages/MergeTree/MergeTreeSettings.h | 3 --- ...do_not_merge_across_partitions_select_final.sql | 4 ++-- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index a1a7a690e40..770ac68aba1 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -398,7 +398,9 @@ class IColumn; M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \ M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \ M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \ - M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) + M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) \ + \ + M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \ #define FORMAT_FACTORY_SETTINGS(M) \ M(Char, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index faa9b80e3c8..5acbec9d5b3 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1249,7 +1249,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( auto it = parts.begin(); parts_to_merge_ranges.push_back(it); - if (data_settings->do_not_merge_across_partitions_select_final) + if (settings.do_not_merge_across_partitions_select_final) { while (it != parts.end()) { @@ -1305,9 +1305,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( out_projection = createProjection(pipe, data); /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition - /// then we won't postprocess this part - if (data_settings->do_not_merge_across_partitions_select_final && - std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1) + /// with level > 0 then we won't postprocess this part + if (settings.do_not_merge_across_partitions_select_final && + std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && + parts_to_merge_ranges[range_index]->data_part->info.level > 0) { partition_pipes.emplace_back(std::move(pipe)); continue; @@ -1333,7 +1334,8 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( { switch (data.merging_params.mode) { - case MergeTreeData::MergingParams::Ordinary: { + case MergeTreeData::MergingParams::Ordinary: + { return std::make_shared(header, pipe.numOutputPorts(), sort_description, max_block_size); } @@ -1398,8 +1400,6 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( processors.reserve(ports.size() + num_streams); output_ports.reserve(ports.size()); - LOG_DEBUG(log, "Output ports size: {}", ports.size()); - for (auto & port : ports) { auto copier = std::make_shared(header, num_streams); diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index a9bd4248259..fe97298fe75 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -112,9 +112,6 @@ struct Settings; /** Obsolete settings. Kept for backward compatibility only. */ \ M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \ M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \ - /** Select settings */ \ - M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \ - /// Settings that should not change after the creation of a table. #define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \ M(index_granularity) diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql index d670cf8594c..08ac9a6aa1a 100644 --- a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql @@ -1,6 +1,6 @@ DROP TABLE IF EXISTS select_final; -CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1; +CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x; INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2); INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2); @@ -9,7 +9,7 @@ INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2); INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2); -SELECT * FROM select_final FINAL ORDER BY x; +SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;; DROP TABLE select_final; From 10fb8624db2bcbed2c659793bc4c28b272671cd7 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Wed, 21 Oct 2020 20:44:35 +0300 Subject: [PATCH 06/10] Remove double simbol in test --- .../01524_do_not_merge_across_partitions_select_final.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql index 08ac9a6aa1a..d332946605d 100644 --- a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql @@ -9,7 +9,7 @@ INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2); INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2); -SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;; +SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1; DROP TABLE select_final; From 0274c9e9eb0664bd6bd6a40f452707312e692b35 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 23 Oct 2020 12:05:57 +0300 Subject: [PATCH 07/10] Add perf test --- tests/performance/optimized_select_final.xml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/performance/optimized_select_final.xml diff --git a/tests/performance/optimized_select_final.xml b/tests/performance/optimized_select_final.xml new file mode 100644 index 00000000000..e8f39239f9b --- /dev/null +++ b/tests/performance/optimized_select_final.xml @@ -0,0 +1,20 @@ + + + 1 + + + + CREATE TABLE optimized_select_final (t DateTime, x Int32) + ENGINE = ReplacingMergeTree() + PARTITION BY toYYYYMM(t) ORDER BY x + + + INSERT INTO optimized_select_final SELECT toDate('2000-01-01'), number FROM numbers(1000000) + + INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(1000000) + + SELECT * FROM optimized_select_final final + + DROP TABLE IF EXISTS optimized_select_final + + From 4592c5e59b1915bfa472bf8561722bbf173d21c0 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 27 Oct 2020 14:42:26 +0300 Subject: [PATCH 08/10] Add OPTIMIZE in perf test --- tests/performance/optimized_select_final.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/performance/optimized_select_final.xml b/tests/performance/optimized_select_final.xml index e8f39239f9b..8bdaa2a2e70 100644 --- a/tests/performance/optimized_select_final.xml +++ b/tests/performance/optimized_select_final.xml @@ -13,7 +13,9 @@ INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(1000000) - SELECT * FROM optimized_select_final final + OPTIMIZE TABLE optimized_select_final + + SELECT * FROM optimized_select_final FINAL DROP TABLE IF EXISTS optimized_select_final From 9f0f981642cc9c7bf651585b26bf5f6b9559552f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 29 Oct 2020 14:03:50 +0300 Subject: [PATCH 09/10] Update optimized_select_final.xml --- tests/performance/optimized_select_final.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance/optimized_select_final.xml b/tests/performance/optimized_select_final.xml index 8bdaa2a2e70..c904e552120 100644 --- a/tests/performance/optimized_select_final.xml +++ b/tests/performance/optimized_select_final.xml @@ -15,7 +15,7 @@ OPTIMIZE TABLE optimized_select_final - SELECT * FROM optimized_select_final FINAL + SELECT * FROM optimized_select_final FINAL FORMAT Null DROP TABLE IF EXISTS optimized_select_final From 48185d437ac70b33819ad3d95297efb374a61e8c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 30 Oct 2020 17:48:59 +0300 Subject: [PATCH 10/10] Update optimized_select_final.xml --- tests/performance/optimized_select_final.xml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/performance/optimized_select_final.xml b/tests/performance/optimized_select_final.xml index c904e552120..46b408d0cb4 100644 --- a/tests/performance/optimized_select_final.xml +++ b/tests/performance/optimized_select_final.xml @@ -9,9 +9,10 @@ PARTITION BY toYYYYMM(t) ORDER BY x - INSERT INTO optimized_select_final SELECT toDate('2000-01-01'), number FROM numbers(1000000) - - INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(1000000) + INSERT INTO optimized_select_final SELECT toDate('2000-01-01'), number FROM numbers(5000000) + INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(5000000) + INSERT INTO optimized_select_final SELECT toDate('2021-01-01'), number FROM numbers(5000000) + INSERT INTO optimized_select_final SELECT toDate('2022-01-01'), number FROM numbers(5000000) OPTIMIZE TABLE optimized_select_final