support non-preliminary merge case

This commit is contained in:
jsc0218 2024-05-06 13:25:19 +00:00
parent 04a757eb71
commit 1c2c3aed24
6 changed files with 34 additions and 29 deletions

View File

@ -102,10 +102,13 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
/// If virtual row exists, test it first, so don't read more chunks.
/// If virtual row exists, let it pass through, so don't read more chunks.
auto chunk = input.pull(true);
if ((limit_hint == 0 && !getVirtualRowFromChunk(chunk))
|| (limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
bool virtual_row = getVirtualRowFromChunk(chunk);
if (limit_hint == 0 && !virtual_row)
input.setNeeded();
if (!virtual_row && ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end))
input.setNeeded();
if (!chunk.hasRows())

View File

@ -597,10 +597,8 @@ Pipe ReadFromMergeTree::readInOrder(
actions_settings, block_size, reader_settings);
processor->addPartLevelToChunk(isQueryWithFinal());
auto primary_key_index = part_with_ranges.data_part->getIndex();
chassert(primary_key_index);
processor->addVirtualRowToChunk(need_virtual_row, *primary_key_index, part_with_ranges.ranges.front().begin);
processor->addVirtualRowToChunk(need_virtual_row, part_with_ranges.data_part->getIndex(),
part_with_ranges.ranges.front().begin);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
if (set_rows_approx)
@ -1037,7 +1035,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
{
/// need_virtual_row = true means a MergingSortedTransform should occur.
/// If so, adding a virtual row might speedup in the case of multiple parts.
bool need_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
bool need_virtual_row = item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit, need_virtual_row));
}
}

View File

@ -152,10 +152,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
const ColumnWithTypeAndName & type_and_name = result_header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
if (j < index.size() && type_and_name.name == primary_key.column_names[j] && type_and_name.type == primary_key.data_types[j])
if (j < index->size() && type_and_name.name == primary_key.column_names[j] && type_and_name.type == primary_key.data_types[j])
{
auto column = current_column->cloneEmpty();
column->insert((*index[j])[mark_range_begin]);
column->insert((*(*index)[j])[mark_range_begin]);
ordered_columns.push_back(std::move(column));
++j;
}

View File

@ -65,7 +65,7 @@ public:
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
void addVirtualRowToChunk(bool add_virtual_row_, const Columns & index_, size_t mark_range_begin_)
void addVirtualRowToChunk(bool add_virtual_row_, const IMergeTreeDataPart::Index & index_, size_t mark_range_begin_)
{
add_virtual_row = add_virtual_row_;
index = index_;
@ -101,7 +101,7 @@ private:
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
bool add_virtual_row = false;
/// PK index used in virtual row.
Columns index;
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate, used in virtual row.
size_t mark_range_begin;

View File

@ -3,3 +3,8 @@
2
3
16386
0
1
2
3
16386

View File

@ -46,24 +46,23 @@ AND query not like '%system.query_log%'
ORDER BY query_start_time DESC, read_rows DESC
LIMIT 1;
-- SELECT x
-- FROM t
-- ORDER BY x ASC
-- LIMIT 4
-- SETTINGS max_block_size = 8192,
-- read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
-- max_threads = 1,
-- optimize_read_in_order = 1;
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1;
-- SYSTEM FLUSH LOGS;
SYSTEM FLUSH LOGS;
-- -- without virtual row 16.38k, but with virtual row 24.58k, becasue read again (why?) in the non-target part after reading its virtual row and before sending the virtual row to the priority queue
-- SELECT read_rows
-- FROM system.query_log
-- WHERE current_database = currentDatabase()
-- AND query like '%SELECT x%'
-- AND query not like '%system.query_log%'
-- ORDER BY query_start_time DESC, read_rows DESC
-- LIMIT 1;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND query like '%SELECT x%'
AND query not like '%system.query_log%'
ORDER BY query_start_time DESC, read_rows DESC
LIMIT 1;
DROP TABLE t;