Merge pull request #45681 from CurtizJ/compact-parts-vertical-merge

Allow vertical merges from compact to wide parts
This commit is contained in:
Nikolai Kochetov 2023-02-07 17:09:17 +01:00 committed by GitHub
commit 3912f5a333
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 19 deletions

View File

@ -110,8 +110,6 @@ public:
virtual bool isStoredOnRemoteDiskWithZeroCopySupport() const = 0;
virtual bool supportsVerticalMerge() const { return false; }
/// NOTE: Returns zeros if column files are not found in checksums.
/// Otherwise return information about column size on disk.
ColumnSize getColumnSize(const String & column_name) const;

View File

@ -953,10 +953,10 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
return MergeAlgorithm::Horizontal;
if (ctx->need_remove_expired_values)
return MergeAlgorithm::Horizontal;
for (const auto & part : global_ctx->future_part->parts)
if (!part->supportsVerticalMerge() || !isFullPartStorage(part->getDataPartStorage()))
return MergeAlgorithm::Horizontal;
if (global_ctx->future_part->part_format.part_type != MergeTreeDataPartType::Wide)
return MergeAlgorithm::Horizontal;
if (global_ctx->future_part->part_format.storage_type != MergeTreeDataPartStorageType::Full)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||

View File

@ -47,8 +47,6 @@ public:
bool isStoredOnRemoteDiskWithZeroCopySupport() const override;
bool supportsVerticalMerge() const override { return true; }
String getFileNameForColumn(const NameAndTypePair & column) const override;
~MergeTreeDataPartWide() override;

View File

@ -135,6 +135,7 @@ static size_t computeIndexGranularityImpl(
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
{
index_granularity_for_block = fixed_index_granularity_rows;
@ -143,7 +144,9 @@ static size_t computeIndexGranularityImpl(
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
{
index_granularity_for_block = rows_in_block;
}
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
@ -155,10 +158,14 @@ static size_t computeIndexGranularityImpl(
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// very rare case when index granularity bytes less then single row
/// We should be less or equal than fixed index granularity.
/// But if block size is a granule size then do not adjust it.
/// Granularity greater than fixed granularity might come from compact part.
if (!blocks_are_granules)
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// Very rare case when index granularity bytes less than single row.
if (index_granularity_for_block == 0)
index_granularity_for_block = 1;

View File

@ -484,15 +484,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
column->size(), mark_num, index_granularity.getMarksCount(), index_granularity_rows);
}
if (index_granularity_rows > data_part->index_granularity_info.fixed_index_granularity)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Mark #{} has {} rows, but max fixed granularity is {}, index granularity size {}",
mark_num, index_granularity_rows, data_part->index_granularity_info.fixed_index_granularity,
index_granularity.getMarksCount());
}
if (index_granularity_rows != index_granularity.getMarkRows(mark_num))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect mark rows for part {} for mark #{}"
@ -501,6 +494,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
mark_num, offset_in_compressed_file, offset_in_decompressed_block,
index_granularity.getMarkRows(mark_num), index_granularity_rows,
index_granularity.getMarksCount());
}
auto column = type->createColumn();

View File

@ -0,0 +1,2 @@
1 2 MergeParts Horizontal Compact
1 3 MergeParts Vertical Wide

View File

@ -0,0 +1,41 @@
DROP TABLE IF EXISTS t_compact_vertical_merge;
CREATE TABLE t_compact_vertical_merge (id UInt64, s LowCardinality(String), arr Array(UInt64))
ENGINE MergeTree ORDER BY id
SETTINGS
index_granularity = 16,
min_bytes_for_wide_part = 0,
min_rows_for_wide_part = 100,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_compact_vertical_merge SELECT number, toString(number), range(number % 10) FROM numbers(40);
INSERT INTO t_compact_vertical_merge SELECT number, toString(number), range(number % 10) FROM numbers(40);
OPTIMIZE TABLE t_compact_vertical_merge FINAL;
SYSTEM FLUSH LOGS;
WITH splitByChar('_', part_name) AS name_parts,
name_parts[2]::UInt64 AS min_block,
name_parts[3]::UInt64 AS max_block
SELECT min_block, max_block, event_type, merge_algorithm, part_type FROM system.part_log
WHERE
database = currentDatabase() AND
table = 't_compact_vertical_merge' AND
min_block = 1 AND max_block = 2;
INSERT INTO t_compact_vertical_merge SELECT number, toString(number), range(number % 10) FROM numbers(40);
OPTIMIZE TABLE t_compact_vertical_merge FINAL;
SYSTEM FLUSH LOGS;
WITH splitByChar('_', part_name) AS name_parts,
name_parts[2]::UInt64 AS min_block,
name_parts[3]::UInt64 AS max_block
SELECT min_block, max_block, event_type, merge_algorithm, part_type FROM system.part_log
WHERE
database = currentDatabase() AND
table = 't_compact_vertical_merge' AND
min_block = 1 AND max_block = 3;
DROP TABLE t_compact_vertical_merge;