FINAL to not compare rows from same non-L0 part

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-12-22 02:43:05 +00:00
parent 2dcffcc41c
commit 17ed277191
8 changed files with 51 additions and 12 deletions

View File

@ -19,8 +19,8 @@ Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns
checkNumRowsIsConsistent();
}
Chunk::Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_)
: columns(std::move(columns_)), num_rows(num_rows_), chunk_info(std::move(chunk_info_))
Chunk::Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_, int part_level_)
: columns(std::move(columns_)), num_rows(num_rows_), chunk_info(std::move(chunk_info_)), origin_merge_tree_part_level(part_level_)
{
checkNumRowsIsConsistent();
}
@ -41,15 +41,15 @@ Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_)
checkNumRowsIsConsistent();
}
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_), chunk_info(std::move(chunk_info_))
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_, int part_level_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_), chunk_info(std::move(chunk_info_)), origin_merge_tree_part_level(part_level_)
{
checkNumRowsIsConsistent();
}
Chunk Chunk::clone() const
{
return Chunk(getColumns(), getNumRows(), chunk_info);
return Chunk(getColumns(), getNumRows(), chunk_info, origin_merge_tree_part_level);
}
void Chunk::setColumns(Columns columns_, UInt64 num_rows_)
@ -234,4 +234,9 @@ Chunk cloneConstWithDefault(const Chunk & chunk, size_t num_rows)
return Chunk(std::move(columns), num_rows);
}
bool Chunk::mayContainRowsWithSamePrimaryKeys() const
{
return origin_merge_tree_part_level < 1;
}
}

View File

@ -38,14 +38,16 @@ public:
: columns(std::move(other.columns))
, num_rows(other.num_rows)
, chunk_info(std::move(other.chunk_info))
, origin_merge_tree_part_level(other.origin_merge_tree_part_level)
{
other.num_rows = 0;
other.origin_merge_tree_part_level = -1;
}
Chunk(Columns columns_, UInt64 num_rows_);
Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk(Columns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_, int part_level_ = -1);
Chunk(MutableColumns columns_, UInt64 num_rows_);
Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_);
Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_, int part_level_ = -1);
Chunk & operator=(const Chunk & other) = delete;
Chunk & operator=(Chunk && other) noexcept
@ -53,7 +55,9 @@ public:
columns = std::move(other.columns);
chunk_info = std::move(other.chunk_info);
num_rows = other.num_rows;
origin_merge_tree_part_level = other.origin_merge_tree_part_level;
other.num_rows = 0;
other.origin_merge_tree_part_level = -1;
return *this;
}
@ -64,6 +68,7 @@ public:
columns.swap(other.columns);
chunk_info.swap(other.chunk_info);
std::swap(num_rows, other.num_rows);
std::swap(origin_merge_tree_part_level, other.origin_merge_tree_part_level);
}
void clear()
@ -71,6 +76,7 @@ public:
num_rows = 0;
columns.clear();
chunk_info.reset();
origin_merge_tree_part_level = -1;
}
const Columns & getColumns() const { return columns; }
@ -104,11 +110,17 @@ public:
void append(const Chunk & chunk);
void append(const Chunk & chunk, size_t from, size_t length); // append rows [from, from+length) of chunk
/// Only use in FINAL
bool mayContainRowsWithSamePrimaryKeys() const;
private:
Columns columns;
UInt64 num_rows = 0;
ChunkInfoPtr chunk_info;
/// It can be non-negative when Chunk produced by a merge tree source
Int32 origin_merge_tree_part_level = -1;
void checkNumRowsIsConsistent();
};

View File

@ -305,7 +305,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
{
detail::RowRef current_key;
current_key.set(current);
setRowRef(current_key, current);
key_differs = last_key.empty() || !last_key.hasEqualSortColumnsWith(current_key);

View File

@ -24,6 +24,7 @@ protected:
void initializeQueue(Inputs inputs);
void updateCursor(Input & input, size_t source_num);
bool skipLastRowFor(size_t input_number) const { return current_inputs[input_number].skip_last_row; }
void setRowRef(detail::RowRef & row, SortCursor & cursor) { row.set(cursor, current_inputs[cursor.impl->order].chunk.mayContainRowsWithSamePrimaryKeys()); }
private:
Block header;

View File

@ -129,14 +129,19 @@ struct RowRef
size_t num_columns = 0;
UInt64 row_num = 0;
int source_stream_index = -1;
bool source_may_contain_rows_with_same_primary_keys = true;
bool empty() const { return sort_columns == nullptr; }
void reset() { sort_columns = nullptr; }
void set(SortCursor & cursor)
void set(SortCursor & cursor, bool source_may_contain_rows_with_same_primary_keys_ = false)
{
sort_columns = cursor.impl->sort_columns.data();
num_columns = cursor.impl->sort_columns.size();
row_num = cursor.impl->getRow();
source_stream_index = static_cast<int>(cursor.impl->order);
source_may_contain_rows_with_same_primary_keys = source_may_contain_rows_with_same_primary_keys_;
}
static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row)
@ -155,6 +160,11 @@ struct RowRef
bool hasEqualSortColumnsWith(const RowRef & other) const
{
/// If both chunks are from a same source stream, and stream doesn't contains row with same primary keys
/// (e.g. stream read from a non-L0 part), then we don't need to compare them.
if (source_stream_index >= 0 && source_stream_index == other.source_stream_index && !source_may_contain_rows_with_same_primary_keys)
return false;
return checkEquals(num_columns, sort_columns, row_num, other.sort_columns, other.row_num);
}
};
@ -171,12 +181,15 @@ struct RowRefWithOwnedChunk
ColumnRawPtrs * sort_columns = nullptr;
UInt64 row_num = 0;
int source_stream_index = -1;
void swap(RowRefWithOwnedChunk & other)
{
owned_chunk.swap(other.owned_chunk);
std::swap(all_columns, other.all_columns);
std::swap(sort_columns, other.sort_columns);
std::swap(row_num, other.row_num);
std::swap(source_stream_index, other.source_stream_index);
}
bool empty() const { return owned_chunk == nullptr; }
@ -187,6 +200,7 @@ struct RowRefWithOwnedChunk
all_columns = nullptr;
sort_columns = nullptr;
row_num = 0;
source_stream_index = -1;
}
void set(SortCursor & cursor, SharedChunkPtr chunk)
@ -195,10 +209,16 @@ struct RowRefWithOwnedChunk
row_num = cursor.impl->getRow();
all_columns = &owned_chunk->all_columns;
sort_columns = &owned_chunk->sort_columns;
source_stream_index = static_cast<int>(cursor.impl->order);
}
bool hasEqualSortColumnsWith(const RowRefWithOwnedChunk & other) const
{
/// If both chunks are from a same source stream, and stream doesn't contains row with same primary keys
/// (e.g. stream read from a non-L0 part), then we don't need to compare them.
if (source_stream_index >= 0 && source_stream_index == other.source_stream_index && !owned_chunk->mayContainRowsWithSamePrimaryKeys())
return false;
return RowRef::checkEquals(sort_columns->size(), sort_columns->data(), row_num,
other.sort_columns->data(), other.row_num);
}

View File

@ -737,7 +737,7 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
{
detail::RowRef current_key;
current_key.set(current);
setRowRef(current_key, current);
key_differs = last_key.empty() || !last_key.hasEqualSortColumnsWith(current_key);

View File

@ -171,9 +171,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
auto name = result_header.getByPosition(i).name;
ordered_columns.push_back(res.block.getByName(name).column);
}
auto level = task->getInfo().data_part->info.level;
return ChunkAndProgress{
.chunk = Chunk(ordered_columns, res.row_count),
.chunk = Chunk(ordered_columns, res.row_count, nullptr, level),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes,
.is_finished = false};

View File

@ -206,7 +206,7 @@ try
++it;
}
return Chunk(std::move(res_columns), rows_read);
return Chunk(std::move(res_columns), rows_read, nullptr, data_part->info.level);
}
}
else