add check flag

This commit is contained in:
jsc0218 2024-05-14 19:43:47 +00:00
parent 0537b8c833
commit 8f8ba55ac3
9 changed files with 84 additions and 15 deletions

View File

@ -13,6 +13,7 @@ public:
explicit MergeTreeReadInfo(size_t part_level, bool virtual_row_) :
origin_merge_tree_part_level(part_level), virtual_row(virtual_row_) { }
size_t origin_merge_tree_part_level = 0;
/// If virtual_row is true, the chunk must contain the virtual row only.
bool virtual_row = false;
};

View File

@ -597,8 +597,9 @@ Pipe ReadFromMergeTree::readInOrder(
actions_settings, block_size, reader_settings);
processor->addPartLevelToChunk(isQueryWithFinal());
processor->addVirtualRowToChunk(need_virtual_row, part_with_ranges.data_part->getIndex(),
part_with_ranges.ranges.front().begin);
processor->addVirtualRowToChunk(part_with_ranges.data_part->getIndex(), part_with_ranges.ranges.front().begin);
if (need_virtual_row)
processor->enableVirtualRow();
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
if (set_rows_approx)

View File

@ -13,6 +13,9 @@
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/ScatterByPartitionTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
namespace CurrentMetrics
{
@ -243,6 +246,34 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
/// We check every step of this pipeline, to make sure virtual row can work correctly.
/// Currently ExpressionTransform is supported, should add other processors if possible.
const auto& pipe = pipeline.getPipe();
bool enable_virtual_row = true;
std::vector<std::shared_ptr<MergeTreeSource>> merge_tree_sources;
for (const auto & processor : pipe.getProcessors())
{
if (auto merge_tree_source = std::dynamic_pointer_cast<MergeTreeSource>(processor))
{
merge_tree_sources.push_back(merge_tree_source);
}
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor))
{
enable_virtual_row = false;
break;
}
}
/// If everything is okay, we enable virtual row in MergeTreeSelectProcessor
if (enable_virtual_row && merge_tree_sources.size() >= 2)
{
for (const auto & merge_tree_source : merge_tree_sources)
{
const auto& merge_tree_select_processor = merge_tree_source->getProcessor();
merge_tree_select_processor->enableVirtualRow();
}
}
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),

View File

@ -197,6 +197,8 @@ public:
void setQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { resources.query_id_holders.emplace_back(std::move(query_id_holder)); }
void addContext(ContextPtr context) { resources.interpreter_context.emplace_back(std::move(context)); }
const Pipe& getPipe() const { return pipe; }
/// Convert query pipeline to pipe.
static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources);
static QueryPipeline getPipeline(QueryPipelineBuilder builder);

View File

@ -134,10 +134,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
if (!task->getMainRangeReader().isInitialized())
initializeRangeReaders();
if (add_virtual_row)
if (enable_virtual_row)
{
/// Turn on virtual row just once.
add_virtual_row = false;
enable_virtual_row = false;
const auto & primary_key = storage_snapshot->metadata->primary_key;

View File

@ -65,13 +65,14 @@ public:
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
void addVirtualRowToChunk(bool add_virtual_row_, const IMergeTreeDataPart::Index & index_, size_t mark_range_begin_)
void addVirtualRowToChunk(const IMergeTreeDataPart::Index & index_, size_t mark_range_begin_)
{
add_virtual_row = add_virtual_row_;
index = index_;
mark_range_begin = mark_range_begin_;
}
void enableVirtualRow() { enable_virtual_row = true; }
private:
/// Sets up range readers corresponding to data readers
void initializeRangeReaders();
@ -99,7 +100,7 @@ private:
/// Should we add a virtual row as the single first chunk.
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
bool add_virtual_row = false;
bool enable_virtual_row = false;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate, used in virtual row.

View File

@ -19,6 +19,8 @@ public:
Status prepare() override;
const MergeTreeSelectProcessorPtr& getProcessor() const { return processor; }
#if defined(OS_LINUX)
int schedule() override;
#endif

View File

@ -12,4 +12,9 @@
1
2
3
16384
16386
16385
16386
16387
16388
24578

View File

@ -27,6 +27,8 @@ INSERT INTO t SELECT
number
FROM numbers(8192 * 3);
SYSTEM STOP MERGES t;
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
@ -37,14 +39,14 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no filter';
log_comment = 'preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no filter'
AND log_comment = 'preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
@ -60,19 +62,20 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'with filter';
log_comment = 'preliminary merge with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'with filter'
AND log_comment = 'preliminary merge with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
-- Should not impact cases without preliminary merge (might read again when chunk row is less than limit)
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
@ -81,14 +84,37 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no impact';
log_comment = 'no preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no impact'
AND log_comment = 'no preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;