diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index ceec13fc4a4..2af5a55047e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2242,18 +2242,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad proj_part->checkConsistency(require_part_metadata); } -void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk() +void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional columns_sample) { - calculateColumnsSizesOnDisk(); + calculateColumnsSizesOnDisk(columns_sample); calculateSecondaryIndicesSizesOnDisk(); } -void IMergeTreeDataPart::calculateColumnsSizesOnDisk() +void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional columns_sample) { if (getColumns().empty() || checksums.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized"); - calculateEachColumnSizes(columns_sizes, total_columns_size); + calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample); } void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() @@ -2500,22 +2500,24 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr(); StorageSnapshotPtr storage_snapshot_ptr = std::make_shared(storage, metadata_ptr); + MergeTreeReaderSettings settings; + settings.can_read_part_without_marks = true; MergeTreeReaderPtr reader = getReader( cols, storage_snapshot_ptr, - MarkRanges{MarkRange(0, 1)}, + MarkRanges{MarkRange(0, total_mark)}, /*virtual_fields=*/ {}, /*uncompressed_cache=*/{}, storage.getContext()->getMarkCache().get(), std::make_shared(), - MergeTreeReaderSettings{}, + settings, ValueSizeMap{}, ReadBufferFromFileBase::ProfileCallback{}); Columns result; result.resize(1); - reader->readRows(0, 1, false, 0, result); + reader->readRows(0, total_mark, false, 0, result); return result[0]; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 85ef0472ce7..591b88fddbf 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -423,7 +423,7 @@ public: bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; /// Calculate column and secondary indices sizes on disk. - void calculateColumnsAndSecondaryIndicesSizesOnDisk(); + void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional columns_sample = std::nullopt); std::optional getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const; @@ -628,7 +628,7 @@ protected: /// Fill each_columns_size and total_size with sizes from columns files on /// disk using columns and checksums. - virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0; + virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const = 0; std::optional getRelativePathForDetachedPart(const String & prefix, bool broken) const; @@ -710,7 +710,7 @@ private: void loadPartitionAndMinMaxIndex(); - void calculateColumnsSizesOnDisk(); + void calculateColumnsSizesOnDisk(std::optional columns_sample = std::nullopt); void calculateSecondaryIndicesSizesOnDisk(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index adaa4eddb98..c64633f89e7 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -47,6 +47,8 @@ public: Columns releaseIndexColumns(); const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } + virtual Block getColumnsSample() const = 0; + protected: SerializationPtr getSerialization(const String & column_name) const; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 2a125e0582b..aa2b4b0f396 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -81,7 +81,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter( } -void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const +void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional /*columns_sample*/) const { auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); if (bin_checksum != checksums.files.end()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/src/Storages/MergeTree/MergeTreeDataPartCompact.h index 1fb84424774..068147fea8d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -65,8 +65,8 @@ private: /// Loads marks index granularity into memory void loadIndexGranularity() override; - /// Compact parts doesn't support per column size, only total size - void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; + /// Compact parts don't support per column size, only total size + void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const override; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index f2dc7232f6d..3e87cafc46d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter( /// Takes into account the fact that several columns can e.g. share their .size substreams. /// When calculating totals these should be counted only once. ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( - const NameAndTypePair & column, std::unordered_set * processed_substreams) const + const NameAndTypePair & column, std::unordered_set * processed_substreams, std::optional columns_sample) const { ColumnSize size; if (checksums.empty()) @@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; - }); + }, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column)); return size; } @@ -328,12 +328,12 @@ std::optional MergeTreeDataPartWide::getFileNameForColumn(const NameAndT return filename; } -void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const +void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const { std::unordered_set processed_substreams; for (const auto & column : columns) { - ColumnSize size = getColumnSizeImpl(column, &processed_substreams); + ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample); each_columns_size[column.name] = size; total_size.add(size); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.h b/src/Storages/MergeTree/MergeTreeDataPartWide.h index 7465e08b7c4..dc4d360587c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.h @@ -60,9 +60,9 @@ private: /// Loads marks index granularity into memory void loadIndexGranularity() override; - ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set * processed_substreams) const; + ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set * processed_substreams, std::optional columns_sample) const; - void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; + void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const override; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 8d84442981e..9a597cead5a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -122,6 +122,8 @@ public: written_offset_columns = written_offset_columns_; } + Block getColumnsSample() const override { return block_sample; } + protected: /// Count index_granularity for block and store in `index_granularity` size_t computeIndexGranularity(const Block & block) const; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 898bf5a2933..3e4a2e19dc1 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -172,7 +172,7 @@ size_t MergeTreeReaderWide::readRows( throw; } - if (column->empty()) + if (column->empty() && max_rows_to_read > 0) res_columns[pos] = nullptr; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 4ee68580d3f..790f86647da 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -205,7 +205,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk()); new_part->index_granularity = writer->getIndexGranularity(); - new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); + new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample()); /// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count diff --git a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference new file mode 100644 index 00000000000..5cab16ed96d --- /dev/null +++ b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference @@ -0,0 +1 @@ +test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB diff --git a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql new file mode 100644 index 00000000000..099bbd5dd22 --- /dev/null +++ b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql @@ -0,0 +1,23 @@ +-- Tags: no-random-settings + +set allow_experimental_dynamic_type = 1; +set allow_experimental_json_type = 1; + + +drop table if exists test; +create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1; +insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000); + +SELECT + `table`, + formatReadableQuantity(sum(rows)) AS rows, + formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed, + formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed, + formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk +FROM system.parts +WHERE active AND (database = currentDatabase()) AND (`table` = 'test') +GROUP BY `table` +ORDER BY `table` ASC; + +drop table test; +