Update test, reduce num_threads_for_lonely_parts if data is small

This commit is contained in:
Pavel Kruglov 2021-01-29 21:00:08 +03:00
parent 71f4acd48b
commit 78371e15dc
2 changed files with 31 additions and 12 deletions

View File

@ -1335,13 +1335,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
data_settings->index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks);
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
@ -1381,6 +1374,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
/// we will store lonely parts with level > 0 to use parallel select on them.
std::vector<RangesInDataPart> lonely_parts;
size_t total_rows_in_lonely_parts = 0;
size_t sum_marks_in_lonely_parts = 0;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
@ -1398,6 +1392,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
total_rows_in_lonely_parts += parts_to_merge_ranges[range_index]->getRowsCount();
sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount();
lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index]));
continue;
}
@ -1485,9 +1480,21 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks_in_lonely_parts);
/// Reduce the number of num_streams_for_lonely_parts if the data is small.
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams_for_lonely_parts,
sum_marks,
sum_marks_in_lonely_parts,
min_marks_for_concurrent_read,
std::move(lonely_parts),
data,

View File

@ -1,5 +1,7 @@
DROP TABLE IF EXISTS select_final;
SET do_not_merge_across_partitions_select_final = 1;
CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t);
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(2);
@ -9,7 +11,7 @@ INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2)
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1, '' FROM numbers(2);
SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;
SELECT * FROM select_final FINAL ORDER BY x;
TRUNCATE TABLE select_final;
@ -21,8 +23,18 @@ OPTIMIZE TABLE select_final FINAL;
INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number, 'updated' FROM numbers(2);
SELECT max(x) FROM select_final FINAL where string = 'updated' SETTINGS do_not_merge_across_partitions_select_final = 1;
SELECT arrayUniq(thread_ids) FROM system.query_log ORDER BY event_time LIMIT 1;
SELECT max(x) FROM select_final FINAL where string = 'updated';
TRUNCATE TABLE select_final;
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(500000);
OPTIMIZE TABLE select_final FINAL;
SELECT max(x) FROM select_final FINAL;
SYSTEM FLUSH LOGS;
SELECT length(thread_ids) > 1 FROM system.query_log WHERE query='SELECT max(x) FROM select_final FINAL;' AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1;
DROP TABLE select_final;