Add tests and comments

This commit is contained in:
Pavel Kruglov 2020-10-13 21:55:03 +03:00
parent 8200bab859
commit be0cb31d21
3 changed files with 39 additions and 7 deletions

View File

@ -1240,6 +1240,11 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (num_streams > settings.max_final_threads) if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads; num_streams = settings.max_final_threads;
/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
/// We have all parts in parts vector, where parts with same partition are nerby.
/// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
/// then we will create a pipe for each partition that will run selecting processor and merging processor
/// for the parts with this partition. In the end we will unite all the pipes.
std::vector<RangesInDataParts::iterator> parts_to_merge_ranges; std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
auto it = parts.begin(); auto it = parts.begin();
parts_to_merge_ranges.push_back(it); parts_to_merge_ranges.push_back(it);
@ -1252,14 +1257,17 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
parts_to_merge_ranges.push_back(it); parts_to_merge_ranges.push_back(it);
} }
/// We divide threads for each partition equally. But we will create at least the number of partitions threads.
/// (So, the total number of threads could be more than initial num_streams.
num_streams /= (parts_to_merge_ranges.size() - 1); num_streams /= (parts_to_merge_ranges.size() - 1);
} }
else else
{ {
/// If do_not_merge_across_partitions_select_final is false we just merge all the parts.
parts_to_merge_ranges.push_back(parts.end()); parts_to_merge_ranges.push_back(parts.end());
} }
Pipes select_and_merge_pipes; Pipes partition_pipes;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{ {
@ -1270,7 +1278,6 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{ {
LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id);
auto source_processor = std::make_shared<MergeTreeSelectProcessor>( auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, data,
metadata_snapshot, metadata_snapshot,
@ -1297,9 +1304,13 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (!out_projection) if (!out_projection)
out_projection = createProjection(pipe, data); out_projection = createProjection(pipe, data);
if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) /// If do_not_merge_across_partitions_select_final is true, there is only one part in partition and it's level > 0
/// then we won't merge this part.
if (data_settings->do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{ {
select_and_merge_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(std::move(pipe));
continue; continue;
} }
@ -1359,7 +1370,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (num_streams <= 1 || sort_description.empty()) if (num_streams <= 1 || sort_description.empty())
{ {
pipe.addTransform(get_merging_processor()); pipe.addTransform(get_merging_processor());
select_and_merge_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(std::move(pipe));
continue; continue;
} }
@ -1413,10 +1424,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
return processors; return processors;
}); });
select_and_merge_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(std::move(pipe));
} }
return Pipe::unitePipes(std::move(select_and_merge_pipes)); return Pipe::unitePipes(std::move(partition_pipes));
} }
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition. /// Calculates a set of mark ranges, that could possibly contain keys, required by condition.

View File

@ -0,0 +1,6 @@
2000-01-01 00:00:00 0
2020-01-01 00:00:00 0
2000-01-01 00:00:00 1
2020-01-01 00:00:00 1
2000-01-01 00:00:00 2
2020-01-01 00:00:00 2

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS select_final;
CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;
INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2);
SELECT * FROM select_final FINAL ORDER BY x;
DROP TABLE select_final;