use empty chunk with pk block

This commit is contained in:
jsc0218 2024-09-21 14:53:45 +00:00
parent b4e5c11fd7
commit 82b4986ee3
9 changed files with 77 additions and 51 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/Chunk.h>
#include <Core/Block.h>
namespace DB
{
@ -10,13 +11,16 @@ class MergeTreeReadInfo : public ChunkInfoCloneable<MergeTreeReadInfo>
{
public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level, bool virtual_row_) :
origin_merge_tree_part_level(part_level), virtual_row(virtual_row_) { }
explicit MergeTreeReadInfo(size_t part_level) :
origin_merge_tree_part_level(part_level) {}
explicit MergeTreeReadInfo(size_t part_level, const Block & pk_block_) :
origin_merge_tree_part_level(part_level), pk_block(pk_block_) {}
MergeTreeReadInfo(const MergeTreeReadInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
/// If virtual_row is true, the chunk must contain the virtual row only.
bool virtual_row = false;
/// If is virtual_row, block should not be empty.
Block pk_block;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
@ -27,12 +31,37 @@ inline size_t getPartLevelFromChunk(const Chunk & chunk)
return 0;
}
inline bool getVirtualRowFromChunk(const Chunk & chunk)
inline bool isVirtualRow(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->virtual_row;
return read_info->pk_block.columns() > 0;
return false;
}
inline void setVirtualRow(Chunk & chunk, const Block & header)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
chassert(read_info);
const Block & pk_block = read_info->pk_block;
Columns ordered_columns;
ordered_columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
size_t pos = type_and_name.name.find_last_of(".");
String column_name = (pos == String::npos) ? type_and_name.name : type_and_name.name.substr(pos + 1);
const ColumnWithTypeAndName * column = pk_block.findByName(column_name, true);
ordered_columns.push_back(column ? column->column : current_column->cloneResized(1));
}
chunk.setColumns(ordered_columns, 1);
}
}

View File

@ -55,6 +55,14 @@ void MergingSortedAlgorithm::addInput()
void MergingSortedAlgorithm::initialize(Inputs inputs)
{
for (auto & input : inputs)
{
if (!isVirtualRow(input.chunk))
continue;
setVirtualRow(input.chunk, header);
}
removeConstAndSparse(inputs);
merged_data.initialize(header, inputs);
current_inputs = std::move(inputs);
@ -139,7 +147,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
auto current = queue.current();
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
if (isVirtualRow(current_inputs[current.impl->order].chunk))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Virtual row is not implemented for Non-batch mode.");
if (current.impl->isLast() && current_inputs[current.impl->order].skip_last_row)
@ -238,7 +246,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
auto [current_ptr, initial_batch_size] = queue.current();
auto current = *current_ptr;
if (getVirtualRowFromChunk(current_inputs[current.impl->order].chunk))
if (isVirtualRow(current_inputs[current.impl->order].chunk))
{
/// If virtual row is detected, there should be only one row as a single chunk,
/// and always skip this chunk to pull the next one.

View File

@ -104,14 +104,14 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// we won't have to read any chunks anymore;
/// If virtual row exists, let it pass through, so don't read more chunks.
auto chunk = input.pull(true);
bool virtual_row = getVirtualRowFromChunk(chunk);
bool virtual_row = isVirtualRow(chunk);
if (limit_hint == 0 && !virtual_row)
input.setNeeded();
if (!virtual_row && ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end))
input.setNeeded();
if (!chunk.hasRows())
if (!virtual_row && !chunk.hasRows())
{
if (!input.isFinished())
{

View File

@ -88,7 +88,7 @@ IProcessor::Status BufferChunksTransform::prepare()
Chunk BufferChunksTransform::pullChunk(bool & virtual_row)
{
auto chunk = input.pull();
virtual_row = getVirtualRowFromChunk(chunk);
virtual_row = isVirtualRow(chunk);
if (!virtual_row)
num_processed_rows += chunk.getNumRows();

View File

@ -663,12 +663,25 @@ Pipe ReadFromMergeTree::readInOrder(
if (enable_current_virtual_row && (read_type == ReadType::InOrder))
{
const auto & index = part_with_ranges.data_part->getIndex();
const auto & primary_key = storage_snapshot->metadata->primary_key;
size_t mark_range_begin = part_with_ranges.ranges.front().begin;
ColumnsWithTypeAndName pk_columns;
pk_columns.reserve(index->size());
for (size_t j = 0; j < index->size(); ++j)
{
auto column = primary_key.data_types[j]->createColumn()->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
pk_columns.push_back({std::move(column), primary_key.data_types[j], primary_key.column_names[j]});
}
Block pk_block(std::move(pk_columns));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header,
storage_snapshot->metadata->primary_key,
part_with_ranges.data_part->getIndex(),
part_with_ranges.ranges.front().begin);
return std::make_shared<VirtualRowTransform>(header, pk_block);
});
}

View File

@ -9,14 +9,10 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_)
VirtualRowTransform::VirtualRowTransform(const Block & header_, const Block & pk_block_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, header(header_), primary_key(primary_key_)
, index(index_), mark_range_begin(mark_range_begin_)
, header(header_), pk_block(pk_block_)
{
}
@ -89,29 +85,16 @@ void VirtualRowTransform::work()
is_first = false;
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(header.columns());
for (size_t i = 0, j = 0; i < header.columns(); ++i)
Columns empty_columns;
empty_columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
ColumnPtr current_column = type_and_name.type->createColumn();
// ordered_columns.push_back(current_column->cloneResized(1));
if (j < index->size() && type_and_name.name == primary_key.column_names[j]
&& type_and_name.type == primary_key.data_types[j])
{
auto column = current_column->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
ordered_columns.push_back(std::move(column));
++j;
}
else
ordered_columns.push_back(current_column->cloneResized(1));
empty_columns.push_back(type_and_name.type->createColumn()->cloneEmpty());
}
current_chunk.setColumns(ordered_columns, 1);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, true));
current_chunk.setColumns(empty_columns, 0);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, pk_block));
}
else
{

View File

@ -11,10 +11,7 @@ namespace DB
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header_,
const KeyDescription & primary_key_,
const IMergeTreeDataPart::Index & index_,
size_t mark_range_begin_);
explicit VirtualRowTransform(const Block & header_, const Block & pk_block_);
String getName() const override { return "VirtualRowTransform"; }
@ -32,11 +29,7 @@ private:
bool is_first = true;
Block header;
KeyDescription primary_key;
/// PK index used in virtual row.
IMergeTreeDataPart::Index index;
/// The first range that might contain the candidate.
size_t mark_range_begin;
Block pk_block;
};
}

View File

@ -147,7 +147,7 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
auto chunk = Chunk(ordered_columns, res.row_count);
if (add_part_level)
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level, false));
chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(task->getInfo().data_part->info.level));
return ChunkAndProgress{
.chunk = std::move(chunk),

View File

@ -267,7 +267,7 @@ try
auto result = Chunk(std::move(res_columns), rows_read);
if (add_part_level)
result.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(data_part->info.level, false));
result.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(data_part->info.level));
return result;
}
}