Backport #72211 to 24.10: Fix calculating dynamic columns sizes on vertical merge

This commit is contained in:
robot-clickhouse 2024-12-03 14:07:15 +00:00
parent 166ff9161c
commit cdff720374
9 changed files with 52 additions and 9 deletions

View File

@ -48,7 +48,7 @@ public:
Columns releaseIndexColumns();
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
virtual Block getColumnsSample() const = 0;
virtual const Block & getColumnsSample() const = 0;
protected:
SerializationPtr getSerialization(const String & column_name) const;

View File

@ -1129,6 +1129,8 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
ctx->executor.reset();
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
const auto & columns_sample = ctx->column_to->getColumnsSample().getColumnsWithTypeAndName();
global_ctx->gathered_columns_samples.insert(global_ctx->gathered_columns_samples.end(), columns_sample.begin(), columns_sample.end());
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
@ -1274,7 +1276,7 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync);
else
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns, &global_ctx->gathered_columns_samples);
global_ctx->new_data_part->getDataPartStorage().precommitTransaction();
global_ctx->promise.set_value(global_ctx->new_data_part);

View File

@ -182,6 +182,7 @@ private:
NamesAndTypesList merging_columns{};
NamesAndTypesList storage_columns{};
MergeTreeData::DataPart::Checksums checksums_gathered_columns{};
ColumnsWithTypeAndName gathered_columns_samples{};
IndicesDescription merging_skip_indexes;
std::unordered_map<String, IndicesDescription> skip_indexes_by_column;

View File

@ -122,7 +122,9 @@ public:
written_offset_columns = written_offset_columns_;
}
Block getColumnsSample() const override { return block_sample; }
void cancel() noexcept override;
const Block & getColumnsSample() const override { return block_sample; }
protected:
/// Count index_granularity for block and store in `index_granularity`

View File

@ -150,16 +150,18 @@ void MergedBlockOutputStream::finalizePart(
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
MergeTreeData::DataPart::Checksums * additional_column_checksums,
ColumnsWithTypeAndName * additional_columns_samples)
{
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish();
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums, additional_columns_samples).finish();
}
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
MergeTreeData::DataPart::Checksums * additional_column_checksums,
ColumnsWithTypeAndName * additional_columns_samples)
{
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
@ -205,7 +207,14 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
auto columns_sample = writer->getColumnsSample();
if (additional_columns_samples)
{
for (const auto & column : *additional_columns_samples)
columns_sample.insert(column);
}
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(columns_sample);
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count

View File

@ -60,13 +60,15 @@ public:
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr,
ColumnsWithTypeAndName * additional_columns_samples = nullptr);
void finalizePart(
const MergeTreeMutableDataPartPtr & new_part,
bool sync,
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr,
ColumnsWithTypeAndName * additional_columns_samples = nullptr);
private:
/** If `permutation` is given, it rearranges the values in the columns when writing.

View File

@ -30,6 +30,7 @@ public:
MergeTreeData::DataPart::Checksums
fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums);
const Block & getColumnsSample() const { return writer->getColumnsSample(); }
void finish(bool sync);
};

View File

@ -0,0 +1 @@
test 2000000 70 7 7

View File

@ -0,0 +1,25 @@
-- Tags: no-random-settings, no-fasttest
set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1;
drop table if exists test;
create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=0, vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=0;
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000);
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000);
optimize table test final;
SELECT
`table`,
sum(rows) AS rows,
floor(sum(data_uncompressed_bytes) / (1024 * 1024)) AS data_size_uncompressed,
floor(sum(data_compressed_bytes) / (1024 * 1024)) AS data_size_compressed,
floor(sum(bytes_on_disk) / (1024 * 1024)) AS total_size_on_disk
FROM system.parts
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
GROUP BY `table`
ORDER BY `table` ASC;
drop table test;