support non-preliminary merge

This commit is contained in:
jsc0218 2024-09-13 21:09:03 +00:00
parent 79e1ce1d4b
commit 084c8115fe
5 changed files with 75 additions and 7 deletions

View File

@ -820,6 +820,8 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
if (!can_read)
return nullptr;
reading->enableVirtualRow();
}
return order_info;

View File

@ -1099,9 +1099,11 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
for (auto && item : splitted_parts_and_ranges)
{
/// enable_virtual_row = true means a MergingSortedTransform should occur.
/// If so, adding a virtual row might speedup in the case of multiple parts.
enable_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
/// If not enabled before, try to enable it when conditions meet as in the following section of preliminary merge,
/// only ExpressionTransform is added between MergingSortedTransform and readFromMergeTree.
if (!enable_virtual_row)
enable_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit));
}
}

View File

@ -210,6 +210,8 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
void enableVirtualRow() { enable_virtual_row = true; }
private:
int getSortDirection() const
{

View File

@ -10,6 +10,18 @@
16388
24576
========
0
1
2
3
16384
========
16385
16386
16387
16388
24578
========
1 2
1 2
1 3

View File

@ -39,14 +39,14 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no filter';
log_comment = 'preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no filter'
AND log_comment = 'preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
@ -63,18 +63,68 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'with filter';
log_comment = 'preliminary merge with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'with filter'
AND log_comment = 'preliminary merge with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
read_in_order_use_buffering = false, --avoid buffer
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
DROP TABLE t;
SELECT '========';
-- from 02149_read_in_order_fixed_prefix
DROP TABLE IF EXISTS fixed_prefix;