mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
Merge pull request #72744 from ClickHouse/backport/24.9/72211
Backport #72211 to 24.9: Fix calculating dynamic columns sizes on vertical merge
This commit is contained in:
commit
1859b7cc6f
@ -48,7 +48,7 @@ public:
|
||||
Columns releaseIndexColumns();
|
||||
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
|
||||
|
||||
virtual Block getColumnsSample() const = 0;
|
||||
virtual const Block & getColumnsSample() const = 0;
|
||||
|
||||
protected:
|
||||
SerializationPtr getSerialization(const String & column_name) const;
|
||||
|
@ -1092,6 +1092,8 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
|
||||
ctx->executor.reset();
|
||||
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
|
||||
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
|
||||
const auto & columns_sample = ctx->column_to->getColumnsSample().getColumnsWithTypeAndName();
|
||||
global_ctx->gathered_columns_samples.insert(global_ctx->gathered_columns_samples.end(), columns_sample.begin(), columns_sample.end());
|
||||
|
||||
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
|
||||
|
||||
@ -1237,7 +1239,7 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
|
||||
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
|
||||
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync);
|
||||
else
|
||||
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
|
||||
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns, &global_ctx->gathered_columns_samples);
|
||||
|
||||
global_ctx->new_data_part->getDataPartStorage().precommitTransaction();
|
||||
global_ctx->promise.set_value(global_ctx->new_data_part);
|
||||
|
@ -181,6 +181,7 @@ private:
|
||||
NamesAndTypesList merging_columns{};
|
||||
NamesAndTypesList storage_columns{};
|
||||
MergeTreeData::DataPart::Checksums checksums_gathered_columns{};
|
||||
ColumnsWithTypeAndName gathered_columns_samples{};
|
||||
|
||||
IndicesDescription merging_skip_indexes;
|
||||
std::unordered_map<String, IndicesDescription> skip_indexes_by_column;
|
||||
|
@ -122,7 +122,7 @@ public:
|
||||
written_offset_columns = written_offset_columns_;
|
||||
}
|
||||
|
||||
Block getColumnsSample() const override { return block_sample; }
|
||||
const Block & getColumnsSample() const override { return block_sample; }
|
||||
|
||||
protected:
|
||||
/// Count index_granularity for block and store in `index_granularity`
|
||||
|
@ -150,16 +150,18 @@ void MergedBlockOutputStream::finalizePart(
|
||||
const MergeTreeMutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums,
|
||||
ColumnsWithTypeAndName * additional_columns_samples)
|
||||
{
|
||||
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish();
|
||||
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums, additional_columns_samples).finish();
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
const MergeTreeMutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums,
|
||||
ColumnsWithTypeAndName * additional_columns_samples)
|
||||
{
|
||||
/// Finish write and get checksums.
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
@ -205,7 +207,14 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
|
||||
new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk());
|
||||
new_part->index_granularity = writer->getIndexGranularity();
|
||||
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
|
||||
|
||||
auto columns_sample = writer->getColumnsSample();
|
||||
if (additional_columns_samples)
|
||||
{
|
||||
for (const auto & column : *additional_columns_samples)
|
||||
columns_sample.insert(column);
|
||||
}
|
||||
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(columns_sample);
|
||||
|
||||
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
|
||||
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
|
||||
|
@ -60,13 +60,15 @@ public:
|
||||
const MergeTreeMutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list = nullptr,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr,
|
||||
ColumnsWithTypeAndName * additional_columns_samples = nullptr);
|
||||
|
||||
void finalizePart(
|
||||
const MergeTreeMutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list = nullptr,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr,
|
||||
ColumnsWithTypeAndName * additional_columns_samples = nullptr);
|
||||
|
||||
private:
|
||||
/** If `permutation` is given, it rearranges the values in the columns when writing.
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
MergeTreeData::DataPart::Checksums
|
||||
fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums);
|
||||
|
||||
const Block & getColumnsSample() const { return writer->getColumnsSample(); }
|
||||
void finish(bool sync);
|
||||
};
|
||||
|
||||
|
@ -0,0 +1 @@
|
||||
test 2000000 70 7 7
|
@ -0,0 +1,25 @@
|
||||
-- Tags: no-random-settings, no-fasttest
|
||||
|
||||
set allow_experimental_dynamic_type = 1;
|
||||
set allow_experimental_json_type = 1;
|
||||
|
||||
|
||||
drop table if exists test;
|
||||
create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=0, vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=0;
|
||||
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000);
|
||||
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000);
|
||||
optimize table test final;
|
||||
|
||||
SELECT
|
||||
`table`,
|
||||
sum(rows) AS rows,
|
||||
floor(sum(data_uncompressed_bytes) / (1024 * 1024)) AS data_size_uncompressed,
|
||||
floor(sum(data_compressed_bytes) / (1024 * 1024)) AS data_size_compressed,
|
||||
floor(sum(bytes_on_disk) / (1024 * 1024)) AS total_size_on_disk
|
||||
FROM system.parts
|
||||
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
|
||||
GROUP BY `table`
|
||||
ORDER BY `table` ASC;
|
||||
|
||||
drop table test;
|
||||
|
Loading…
Reference in New Issue
Block a user