add more check

This commit is contained in:
jsc0218 2024-05-15 18:26:28 +00:00
parent 8f8ba55ac3
commit 3f6cdeb880
8 changed files with 92 additions and 33 deletions

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
@ -134,6 +139,9 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
auto current = queue.current();
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Virtual row is not implemented for Non-batch mode.");
if (current.impl->isLast() && current_inputs[current.impl->order].skip_last_row)
{
/// Get the next block from the corresponding source, if there is one.

View File

@ -241,38 +241,57 @@ void SortingStep::finishSorting(
});
}
void SortingStep::enableVirtualRow(const QueryPipelineBuilder & pipeline) const
{
/// We check every step of this pipeline, to make sure virtual row can work correctly.
/// Currently ExpressionTransform is supported, should add other processors if possible.
const auto& pipe = pipeline.getPipe();
bool enable_virtual_row = true;
std::vector<std::shared_ptr<MergeTreeSource>> merge_tree_sources;
for (const auto & processor : pipe.getProcessors())
{
if (auto merge_tree_source = std::dynamic_pointer_cast<MergeTreeSource>(processor))
{
merge_tree_sources.push_back(merge_tree_source);
}
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor))
{
enable_virtual_row = false;
break;
}
}
/// If everything is okay, we enable virtual row in MergeTreeSelectProcessor
if (enable_virtual_row && merge_tree_sources.size() >= 2)
{
/// We have to check further in the case of fixed prefix, for example,
/// primary key ab, query SELECT a, b FROM t WHERE a = 1 ORDER BY b,
/// merge sort would sort based on b, leading to wrong result in comparison.
auto extractNameAfterDot = [](const String & name)
{
size_t pos = name.find_last_of('.');
return (pos != String::npos) ? name.substr(pos + 1) : name;
};
const ColumnWithTypeAndName & type_and_name = pipeline.getHeader().getByPosition(0);
String column_name = extractNameAfterDot(type_and_name.name);
for (const auto & merge_tree_source : merge_tree_sources)
{
const auto& merge_tree_select_processor = merge_tree_source->getProcessor();
const auto & primary_key = merge_tree_select_processor->getPrimaryKey();
if (primary_key.column_names[0] == column_name && primary_key.data_types[0] == type_and_name.type)
merge_tree_select_processor->enableVirtualRow();
}
}
}
void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
/// We check every step of this pipeline, to make sure virtual row can work correctly.
/// Currently ExpressionTransform is supported, should add other processors if possible.
const auto& pipe = pipeline.getPipe();
bool enable_virtual_row = true;
std::vector<std::shared_ptr<MergeTreeSource>> merge_tree_sources;
for (const auto & processor : pipe.getProcessors())
{
if (auto merge_tree_source = std::dynamic_pointer_cast<MergeTreeSource>(processor))
{
merge_tree_sources.push_back(merge_tree_source);
}
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor))
{
enable_virtual_row = false;
break;
}
}
/// If everything is okay, we enable virtual row in MergeTreeSelectProcessor
if (enable_virtual_row && merge_tree_sources.size() >= 2)
{
for (const auto & merge_tree_source : merge_tree_sources)
{
const auto& merge_tree_select_processor = merge_tree_source->getProcessor();
merge_tree_select_processor->enableVirtualRow();
}
}
enableVirtualRow(pipeline);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),

View File

@ -116,6 +116,8 @@ private:
UInt64 limit_,
bool skip_partial_sort = false);
void enableVirtualRow(const QueryPipelineBuilder & pipeline) const;
Type type;
SortDescription prefix_description;

View File

@ -139,7 +139,7 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
/// Turn on virtual row just once.
enable_virtual_row = false;
const auto & primary_key = storage_snapshot->metadata->primary_key;
const auto & primary_key = getPrimaryKey();
MergeTreeReadTask::BlockAndProgress res;
res.row_count = 1;

View File

@ -73,6 +73,8 @@ public:
void enableVirtualRow() { enable_virtual_row = true; }
const KeyDescription & getPrimaryKey() const { return storage_snapshot->metadata->primary_key; }
private:
/// Sets up range readers corresponding to data readers
void initializeRangeReaders();

View File

@ -195,14 +195,14 @@ INSERT INTO tab VALUES (201, 'rick c01'), (202, 'mick c02'), (203, 'nick c03');
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search full_text index
SELECT * FROM tab WHERE s LIKE '%01%' ORDER BY k;
SELECT * FROM tab WHERE s LIKE '%01%' ORDER BY k SETTINGS optimize_read_in_order = 1;
-- check the query only read 3 granules (6 rows total; each granule has 2 rows)
-- check the query only read 3 granules (6 rows total; each granule has 2 rows; there are 2 extra virtual rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==6 from system.query_log
SELECT read_rows==8 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s LIKE \'%01%\' ORDER BY k;')
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s LIKE \'%01%\' ORDER BY k SETTINGS optimize_read_in_order = 1;')
AND type='QueryFinish'
AND result_rows==3
LIMIT 1;

View File

@ -3,18 +3,28 @@
2
3
16386
========
16385
16386
16387
16388
24578
========
0
1
2
3
16386
========
16385
16386
16387
16388
24578
========
1 2
1 2
1 3
1 3
1 4
1 4

View File

@ -51,6 +51,7 @@ AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
@ -74,6 +75,7 @@ AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
@ -96,6 +98,7 @@ AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
@ -119,4 +122,19 @@ AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
DROP TABLE t;
DROP TABLE t;
SELECT '========';
-- from 02149_read_in_order_fixed_prefix
DROP TABLE IF EXISTS t_read_in_order;
CREATE TABLE t_read_in_order(a UInt32, b UInt32)
ENGINE = MergeTree ORDER BY (a, b)
SETTINGS index_granularity = 3;
SYSTEM STOP MERGES t_read_in_order;
INSERT INTO t_read_in_order VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
INSERT INTO t_read_in_order VALUES (0, 100), (1, 2), (1, 3), (1, 4), (2, 5);
SELECT a, b FROM t_read_in_order WHERE a = 1 ORDER BY b SETTINGS max_threads = 1;