diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index cf8799052df..8b1e5989de5 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1335,13 +1335,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( data_settings->index_granularity, index_granularity_bytes); - const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( - settings.merge_tree_min_rows_for_concurrent_read, - settings.merge_tree_min_bytes_for_concurrent_read, - data_settings->index_granularity, - index_granularity_bytes, - sum_marks); - if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; @@ -1381,6 +1374,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( /// we will store lonely parts with level > 0 to use parallel select on them. std::vector lonely_parts; size_t total_rows_in_lonely_parts = 0; + size_t sum_marks_in_lonely_parts = 0; for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) { @@ -1398,6 +1392,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( parts_to_merge_ranges[range_index]->data_part->info.level > 0) { total_rows_in_lonely_parts += parts_to_merge_ranges[range_index]->getRowsCount(); + sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount(); lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index])); continue; } @@ -1485,9 +1480,21 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); + const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( + settings.merge_tree_min_rows_for_concurrent_read, + settings.merge_tree_min_bytes_for_concurrent_read, + data_settings->index_granularity, + index_granularity_bytes, + sum_marks_in_lonely_parts); + + /// Reduce the number of num_streams_for_lonely_parts if the data is small. + if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts) + num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size()); + + MergeTreeReadPoolPtr pool = std::make_shared( num_streams_for_lonely_parts, - sum_marks, + sum_marks_in_lonely_parts, min_marks_for_concurrent_read, std::move(lonely_parts), data, diff --git a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql index 0e0f0325c7b..b50e47daa0c 100644 --- a/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql +++ b/tests/queries/0_stateless/01524_do_not_merge_across_partitions_select_final.sql @@ -1,5 +1,7 @@ DROP TABLE IF EXISTS select_final; +SET do_not_merge_across_partitions_select_final = 1; + CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t); INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(2); @@ -9,7 +11,7 @@ INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2) INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1, '' FROM numbers(2); -SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1; +SELECT * FROM select_final FINAL ORDER BY x; TRUNCATE TABLE select_final; @@ -21,8 +23,18 @@ OPTIMIZE TABLE select_final FINAL; INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2); INSERT INTO select_final SELECT toDate('2020-01-01'), number, 'updated' FROM numbers(2); -SELECT max(x) FROM select_final FINAL where string = 'updated' SETTINGS do_not_merge_across_partitions_select_final = 1; -SELECT arrayUniq(thread_ids) FROM system.query_log ORDER BY event_time LIMIT 1; +SELECT max(x) FROM select_final FINAL where string = 'updated'; + +TRUNCATE TABLE select_final; + +INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(500000); + +OPTIMIZE TABLE select_final FINAL; + +SELECT max(x) FROM select_final FINAL; + +SYSTEM FLUSH LOGS; + +SELECT length(thread_ids) > 1 FROM system.query_log WHERE query='SELECT max(x) FROM select_final FINAL;' AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1; DROP TABLE select_final; -