diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index bead5fb515f..fb221544c41 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -48,7 +48,7 @@ public: Columns releaseIndexColumns(); const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } - virtual Block getColumnsSample() const = 0; + virtual const Block & getColumnsSample() const = 0; protected: SerializationPtr getSerialization(const String & column_name) const; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 4dd8400ec96..45b59ef70ce 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1092,6 +1092,8 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const ctx->executor.reset(); auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns); global_ctx->checksums_gathered_columns.add(std::move(changed_checksums)); + const auto & columns_sample = ctx->column_to->getColumnsSample().getColumnsWithTypeAndName(); + global_ctx->gathered_columns_samples.insert(global_ctx->gathered_columns_samples.end(), columns_sample.begin(), columns_sample.end()); ctx->delayed_streams.emplace_back(std::move(ctx->column_to)); @@ -1237,7 +1239,7 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync); else - global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); + global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns, &global_ctx->gathered_columns_samples); global_ctx->new_data_part->getDataPartStorage().precommitTransaction(); global_ctx->promise.set_value(global_ctx->new_data_part); diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 29b5c4452e7..53cc904c223 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -181,6 +181,7 @@ private: NamesAndTypesList merging_columns{}; NamesAndTypesList storage_columns{}; MergeTreeData::DataPart::Checksums checksums_gathered_columns{}; + ColumnsWithTypeAndName gathered_columns_samples{}; IndicesDescription merging_skip_indexes; std::unordered_map skip_indexes_by_column; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 340b7eb9bdd..697282a70d6 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -122,7 +122,9 @@ public: written_offset_columns = written_offset_columns_; } - Block getColumnsSample() const override { return block_sample; } + void cancel() noexcept override; + + const Block & getColumnsSample() const override { return block_sample; } protected: /// Count index_granularity for block and store in `index_granularity` diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 790f86647da..0f2dfb73b34 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -150,16 +150,18 @@ void MergedBlockOutputStream::finalizePart( const MergeTreeMutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list, - MergeTreeData::DataPart::Checksums * additional_column_checksums) + MergeTreeData::DataPart::Checksums * additional_column_checksums, + ColumnsWithTypeAndName * additional_columns_samples) { - finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish(); + finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums, additional_columns_samples).finish(); } MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( const MergeTreeMutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list, - MergeTreeData::DataPart::Checksums * additional_column_checksums) + MergeTreeData::DataPart::Checksums * additional_column_checksums, + ColumnsWithTypeAndName * additional_columns_samples) { /// Finish write and get checksums. MergeTreeData::DataPart::Checksums checksums; @@ -205,7 +207,14 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk()); new_part->index_granularity = writer->getIndexGranularity(); - new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample()); + + auto columns_sample = writer->getColumnsSample(); + if (additional_columns_samples) + { + for (const auto & column : *additional_columns_samples) + columns_sample.insert(column); + } + new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(columns_sample); /// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index e212fe5bb5a..56716d8b9b4 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -60,13 +60,15 @@ public: const MergeTreeMutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list = nullptr, - MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); + MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr, + ColumnsWithTypeAndName * additional_columns_samples = nullptr); void finalizePart( const MergeTreeMutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list = nullptr, - MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); + MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr, + ColumnsWithTypeAndName * additional_columns_samples = nullptr); private: /** If `permutation` is given, it rearranges the values in the columns when writing. diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index e837a62743e..4ee94f473de 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -30,6 +30,7 @@ public: MergeTreeData::DataPart::Checksums fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums); + const Block & getColumnsSample() const { return writer->getColumnsSample(); } void finish(bool sync); }; diff --git a/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.reference b/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.reference new file mode 100644 index 00000000000..777c6e539df --- /dev/null +++ b/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.reference @@ -0,0 +1 @@ +test 2000000 70 7 7 diff --git a/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.sql b/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.sql new file mode 100644 index 00000000000..b3b1c080114 --- /dev/null +++ b/tests/queries/0_stateless/03274_dynamic_column_sizes_vertical_merge.sql @@ -0,0 +1,25 @@ +-- Tags: no-random-settings, no-fasttest + +set allow_experimental_dynamic_type = 1; +set allow_experimental_json_type = 1; + + +drop table if exists test; +create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=0, vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=0; +insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000); +insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(1000000); +optimize table test final; + +SELECT + `table`, + sum(rows) AS rows, + floor(sum(data_uncompressed_bytes) / (1024 * 1024)) AS data_size_uncompressed, + floor(sum(data_compressed_bytes) / (1024 * 1024)) AS data_size_compressed, + floor(sum(bytes_on_disk) / (1024 * 1024)) AS total_size_on_disk +FROM system.parts +WHERE active AND (database = currentDatabase()) AND (`table` = 'test') +GROUP BY `table` +ORDER BY `table` ASC; + +drop table test; +