move logic to virtualrow transform

This commit is contained in:
jsc0218 2024-09-08 00:31:02 +00:00
parent 67ad7b592c
commit 36f62334c4
8 changed files with 90 additions and 172 deletions

View File

@ -615,15 +615,25 @@ Pipe ReadFromMergeTree::readInOrder(
actions_settings, block_size, reader_settings);
processor->addPartLevelToChunk(isQueryWithFinal());
processor->addVirtualRowToChunk(part_with_ranges.data_part->getIndex(), part_with_ranges.ranges.front().begin);
if (need_virtual_row)
processor->enableVirtualRow();
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
Pipe pipe(source);
if (need_virtual_row)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header,
storage_snapshot->metadata->primary_key,
part_with_ranges.data_part->getIndex(),
part_with_ranges.ranges.front().begin);
});
}
pipes.emplace_back(std::move(pipe));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -636,8 +646,6 @@ Pipe ReadFromMergeTree::readInOrder(
});
}
pipe.addSimpleTransform([](const Block & header){ return std::make_shared<VirtualRowTransform>(header); });
return pipe;
}

View File

@ -1,5 +1,5 @@
#include <Processors/Transforms/VirtualRowTransform.h>
#include "Processors/Chunk.h"
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
@ -9,9 +9,14 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header)
: IProcessor({header}, {header})
VirtualRowTransform::VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, header(header_), primary_key(primary_key_)
, index(index_), mark_range_begin(mark_range_begin_)
{
}
@ -72,41 +77,50 @@ void VirtualRowTransform::work()
if (generated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it already was generated");
current_chunk = generate();
generated = true;
can_generate = false;
if (!is_first)
{
if (current_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
return;
}
is_first = false;
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(header.columns());
for (size_t i = 0, j = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
// ordered_columns.push_back(current_column->cloneResized(1));
if (j < index->size() && type_and_name.name == primary_key.column_names[j]
&& type_and_name.type == primary_key.data_types[j])
{
auto column = current_column->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
ordered_columns.push_back(std::move(column));
++j;
}
else
ordered_columns.push_back(current_column->cloneResized(1));
}
current_chunk.setColumns(ordered_columns, 1);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, true));
}
else
{
if (!has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it wasn't read");
consume(std::move(current_chunk));
has_input = false;
can_generate = true;
}
}
void VirtualRowTransform::consume(Chunk chunk)
{
if (!is_first)
{
temp_chunk = std::move(chunk);
return;
}
is_first = false;
temp_chunk = std::move(chunk);
}
Chunk VirtualRowTransform::generate()
{
if (temp_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
Chunk result;
result.swap(temp_chunk);
return result;
}
}

View File

@ -1,15 +1,20 @@
#pragma once
#include <Processors/IInflatingTransform.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/IProcessor.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header);
explicit VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_);
String getName() const override { return "VirtualRowTransform"; }
@ -17,19 +22,21 @@ public:
void work() override;
private:
void consume(Chunk chunk);
Chunk generate();
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool generated = false;
bool can_generate = false;
bool can_generate = true;
bool is_first = true;
bool is_first = false;
Chunk temp_chunk;
Block header;
KeyDescription primary_key;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate.
size_t mark_range_begin;
};
}

View File

@ -134,38 +134,22 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
if (!task->getMainRangeReader().isInitialized())
initializeRangeReaders();
if (enable_virtual_row)
auto res = algorithm->readFromTask(*task, block_size_params);
if (res.row_count)
{
/// Turn on virtual row just once.
enable_virtual_row = false;
const auto & primary_key = getPrimaryKey();
MergeTreeReadTask::BlockAndProgress res;
res.row_count = 1;
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(result_header.columns());
for (size_t i = 0, j = 0; i < result_header.columns(); ++i)
for (size_t i = 0; i < result_header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = result_header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
if (j < index->size() && type_and_name.name == primary_key.column_names[j] && type_and_name.type == primary_key.data_types[j])
{
auto column = current_column->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
ordered_columns.push_back(std::move(column));
++j;
}
else
ordered_columns.push_back(current_column->cloneResized(1));
auto name = result_header.getByPosition(i).name;
ordered_columns.push_back(res.block.getByName(name).column);
}
auto chunk = Chunk(ordered_columns, res.row_count);
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(
add_part_level ? task->getInfo().data_part->info.level : 0, true));
if (add_part_level)
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level, false));
return ChunkAndProgress{
.chunk = std::move(chunk),
@ -175,33 +159,7 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
}
else
{
auto res = algorithm->readFromTask(*task, block_size_params);
if (res.row_count)
{
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(result_header.columns());
for (size_t i = 0; i < result_header.columns(); ++i)
{
auto name = result_header.getByPosition(i).name;
ordered_columns.push_back(res.block.getByName(name).column);
}
auto chunk = Chunk(ordered_columns, res.row_count);
if (add_part_level)
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level, false));
return ChunkAndProgress{
.chunk = std::move(chunk),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes,
.is_finished = false};
}
else
{
return {Chunk(), res.num_read_rows, res.num_read_bytes, false};
}
return {Chunk(), res.num_read_rows, res.num_read_bytes, false};
}
}

View File

@ -60,12 +60,6 @@ public:
void addPartLevelToChunk(bool add_part_level_) { add_part_level = add_part_level_; }
void addVirtualRowToChunk(const IMergeTreeDataPart::Index & index_, size_t mark_range_begin_)
{
index = index_;
mark_range_begin = mark_range_begin_;
}
void enableVirtualRow() { enable_virtual_row = true; }
const KeyDescription & getPrimaryKey() const { return storage_snapshot->metadata->primary_key; }
@ -100,8 +94,6 @@ private:
bool enable_virtual_row = false;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate, used in virtual row.
size_t mark_range_begin;
LoggerPtr log = getLogger("MergeTreeSelectProcessor");
std::atomic<bool> is_cancelled{false};

View File

@ -12,6 +12,7 @@ ExpressionTransform × 3
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
ExpressionTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
VirtualRowTransform × 2
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) × 2 0 → 1
ExpressionTransform
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1

View File

@ -2,25 +2,13 @@
1
2
3
16386
16384
========
16385
16386
16387
16388
24578
========
0
1
2
3
16386
========
16385
16386
16387
16388
24578
24576
========
1 2
1 2

View File

@ -39,14 +39,14 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge, no filter';
log_comment = 'no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge, no filter'
AND log_comment = 'no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
limit 1;
@ -63,68 +63,18 @@ SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 0, --force preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'preliminary merge with filter';
log_comment = 'with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'preliminary merge with filter'
AND log_comment = 'with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + one chunk (8192) for result + one extra chunk for next consumption in merge transform (8192),
-- both chunks come from the same part.
SELECT x
FROM t
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, no filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, no filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
SELECT '========';
-- Expecting 2 virtual rows + two chunks (8192*2) get filtered out + one chunk for result (8192),
-- all chunks come from the same part.
SELECT k
FROM t
WHERE k > 8192 * 2
ORDER BY x ASC
LIMIT 4
SETTINGS max_block_size = 8192,
read_in_order_two_level_merge_threshold = 5, --avoid preliminary merge
read_in_order_use_buffering = false, --avoid buffer
max_threads = 1,
optimize_read_in_order = 1,
log_comment = 'no preliminary merge, with filter';
SYSTEM FLUSH LOGS;
SELECT read_rows
FROM system.query_log
WHERE current_database = currentDatabase()
AND log_comment = 'no preliminary merge, with filter'
AND type = 'QueryFinish'
ORDER BY query_start_time DESC
LIMIT 1;
DROP TABLE t;
SELECT '========';
-- from 02149_read_in_order_fixed_prefix
DROP TABLE IF EXISTS fixed_prefix;