restrict to preliminary merge and add more tests

This commit is contained in:
jsc0218 2024-05-08 00:17:37 +00:00
parent 1c2c3aed24
commit 0537b8c833
3 changed files with 45 additions and 12 deletions

View File

@ -1035,7 +1035,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
{ {
/// need_virtual_row = true means a MergingSortedTransform should occur. /// need_virtual_row = true means a MergingSortedTransform should occur.
/// If so, adding a virtual row might speedup in the case of multiple parts. /// If so, adding a virtual row might speedup in the case of multiple parts.
bool need_virtual_row = item.size() > 1; bool need_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit, need_virtual_row)); pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit, need_virtual_row));
} }
} }

View File

@ -3,8 +3,13 @@
2 2
3 3
16386 16386
16385
16386
16387
16388
24578
0 0
1 1
2 2
3 3
16386 16384

View File

@ -24,28 +24,55 @@ INSERT INTO t SELECT
number + (8192 * 3), number + (8192 * 3),
number + (8192 * 3), number + (8192 * 3),
number + (8192 * 3), number + (8192 * 3),
number + (8192 * 3) number
FROM numbers(8192 * 3); FROM numbers(8192 * 3);
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x SELECT x
FROM t FROM t
ORDER BY x ASC ORDER BY x ASC
LIMIT 4 LIMIT 4
SETTINGS max_block_size = 8192, SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1, max_threads = 1,
optimize_read_in_order = 1; optimize_read_in_order = 1,
log_comment = 'no filter';
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;
SELECT read_rows SELECT read_rows
FROM system.query_log FROM system.query_log
WHERE current_database = currentDatabase() WHERE current_database = currentDatabase()
AND query like '%SELECT x%' AND log_comment = 'no filter'
AND query not like '%system.query_log%' AND type = 'QueryFinish'
ORDER BY query_start_time DESC, read_rows DESC ORDER BY query_start_time DESC
limit 1;
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1; LIMIT 1;
-- Should not impact cases without preliminary merge (might read again when chunk row is less than limit)
SELECT x SELECT x
FROM t FROM t
ORDER BY x ASC ORDER BY x ASC
@ -53,16 +80,17 @@ LIMIT 4
SETTINGS max_block_size = 8192, SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1, max_threads = 1,
optimize_read_in_order = 1; optimize_read_in_order = 1,
log_comment = 'no impact';
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;
SELECT read_rows SELECT read_rows
FROM system.query_log FROM system.query_log
WHERE current_database = currentDatabase() WHERE current_database = currentDatabase()
AND query like '%SELECT x%' AND log_comment = 'no impact'
AND query not like '%system.query_log%' AND type = 'QueryFinish'
ORDER BY query_start_time DESC, read_rows DESC ORDER BY query_start_time DESC
LIMIT 1; LIMIT 1;
DROP TABLE t; DROP TABLE t;