From 020b69647a65dd740cddfbf62730f37de14a4eb8 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 6 Nov 2024 15:15:29 +0000 Subject: [PATCH 1/5] Fix counting column size in wide part for Dynamid and JSON types --- .../MergeTree/MergeTreeDataPartWide.cpp | 2 +- .../MergeTree/MergeTreeReaderWide.cpp | 2 +- ...umn_sizes_with_dynamic_structure.reference | 1 + ...62_column_sizes_with_dynamic_structure.sql | 22 +++++++++++++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference create mode 100644 tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index d6f213463f2..d8470ba8405 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; - }); + }, column.type, getColumnSample(column)); return size; } diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 77231d8d392..885bd1ded8c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -172,7 +172,7 @@ size_t MergeTreeReaderWide::readRows( throw; } - if (column->empty()) + if (column->empty() && max_rows_to_read > 0) res_columns[pos] = nullptr; } diff --git a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference new file mode 100644 index 00000000000..5cab16ed96d --- /dev/null +++ b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.reference @@ -0,0 +1 @@ +test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB diff --git a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql new file mode 100644 index 00000000000..21e6515fc99 --- /dev/null +++ b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql @@ -0,0 +1,22 @@ +-- Tags: no-random-settings + +set allow_experimental_dynamic_type = 1; +set allow_experimental_json_type = 1; + +drop table if exists test; +create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1; +insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000); + +SELECT + `table`, + formatReadableQuantity(sum(rows)) AS rows, + formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed, + formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed, + formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk +FROM system.parts +WHERE active AND (database = currentDatabase()) AND (`table` = 'test') +GROUP BY `table` +ORDER BY `table` ASC; + +drop table test; + From ca23e5254c2cca5e6b3f4a9c7ccd65f70be42fc4 Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 7 Nov 2024 12:44:57 +0000 Subject: [PATCH 2/5] Fix for tmp parts --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 20d7528d38a..fb934a77512 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2501,7 +2501,7 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co { const size_t total_mark = getMarksCount(); /// If column doesn't have dynamic subcolumns or part has no data, just create column using it's type. - if (!column.type->hasDynamicSubcolumns() || !total_mark) + if (is_temp || !column.type->hasDynamicSubcolumns() || !total_mark) return column.type->createColumn(); /// Otherwise, read sample column with 0 rows from the part, so it will load dynamic structure. @@ -2510,22 +2510,24 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr(); StorageSnapshotPtr storage_snapshot_ptr = std::make_shared(storage, metadata_ptr); + MergeTreeReaderSettings settings; + settings.can_read_part_without_marks = true; MergeTreeReaderPtr reader = getReader( cols, storage_snapshot_ptr, - MarkRanges{MarkRange(0, 1)}, + MarkRanges{MarkRange(0, total_mark)}, /*virtual_fields=*/ {}, /*uncompressed_cache=*/{}, storage.getContext()->getMarkCache().get(), std::make_shared(), - MergeTreeReaderSettings{}, + settings, ValueSizeMap{}, ReadBufferFromFileBase::ProfileCallback{}); Columns result; result.resize(1); - reader->readRows(0, 1, false, 0, result); + reader->readRows(0, total_mark, false, 0, result); return result[0]; } From 1157028e35139ead00c7475045795bde966a2e40 Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 7 Nov 2024 19:53:30 +0000 Subject: [PATCH 3/5] Fix getting column sample for not finalized part --- src/Columns/ColumnVariant.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 11 ++++++----- src/Storages/MergeTree/IMergeTreeDataPart.h | 6 +++--- src/Storages/MergeTree/IMergeTreeDataPartWriter.h | 2 ++ src/Storages/MergeTree/MergeTreeDataPartCompact.cpp | 2 +- src/Storages/MergeTree/MergeTreeDataPartCompact.h | 2 +- src/Storages/MergeTree/MergeTreeDataPartWide.cpp | 8 ++++---- src/Storages/MergeTree/MergeTreeDataPartWide.h | 4 ++-- .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 2 ++ src/Storages/MergeTree/MergedBlockOutputStream.cpp | 2 +- 10 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/Columns/ColumnVariant.cpp b/src/Columns/ColumnVariant.cpp index 54f0421fc4b..2fa59b8e33c 100644 --- a/src/Columns/ColumnVariant.cpp +++ b/src/Columns/ColumnVariant.cpp @@ -952,7 +952,7 @@ ColumnPtr ColumnVariant::permute(const Permutation & perm, size_t limit) const if (hasOnlyNulls()) { if (limit) - return cloneResized(limit); + return cloneResized(limit ? std::min(size(), limit) : size()); /// If no limit, we can just return current immutable column. return this->getPtr(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index b631d991e90..f73b52dbafd 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2252,18 +2252,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad proj_part->checkConsistency(require_part_metadata); } -void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk() +void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional columns_sample) { - calculateColumnsSizesOnDisk(); + calculateColumnsSizesOnDisk(columns_sample); calculateSecondaryIndicesSizesOnDisk(); } -void IMergeTreeDataPart::calculateColumnsSizesOnDisk() +void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional columns_sample) { if (getColumns().empty() || checksums.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized"); - calculateEachColumnSizes(columns_sizes, total_columns_size); + calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample); } void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() @@ -2501,7 +2501,7 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co { const size_t total_mark = getMarksCount(); /// If column doesn't have dynamic subcolumns or part has no data, just create column using it's type. - if (is_temp || !column.type->hasDynamicSubcolumns() || !total_mark) + if (!column.type->hasDynamicSubcolumns() || !total_mark) return column.type->createColumn(); /// Otherwise, read sample column with 0 rows from the part, so it will load dynamic structure. @@ -2527,6 +2527,7 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co Columns result; result.resize(1); + LOG_DEBUG(getLogger("IMergeTreeDataPart"), "getColumnSample"); reader->readRows(0, total_mark, false, 0, result); return result[0]; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index b41a1d840e1..a7051a2491a 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -426,7 +426,7 @@ public: bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; /// Calculate column and secondary indices sizes on disk. - void calculateColumnsAndSecondaryIndicesSizesOnDisk(); + void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional columns_sample = std::nullopt); std::optional getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const; @@ -631,7 +631,7 @@ protected: /// Fill each_columns_size and total_size with sizes from columns files on /// disk using columns and checksums. - virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0; + virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const = 0; std::optional getRelativePathForDetachedPart(const String & prefix, bool broken) const; @@ -713,7 +713,7 @@ private: void loadPartitionAndMinMaxIndex(); - void calculateColumnsSizesOnDisk(); + void calculateColumnsSizesOnDisk(std::optional columns_sample = std::nullopt); void calculateSecondaryIndicesSizesOnDisk(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index d1c76505d7c..8923f6a59ca 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -54,6 +54,8 @@ public: const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } + virtual Block getColumnsSample() const = 0; + protected: SerializationPtr getSerialization(const String & column_name) const; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 14c2da82de1..8856f467b90 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -80,7 +80,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter( } -void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const +void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional /*columns_sample*/) const { auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); if (bin_checksum != checksums.files.end()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/src/Storages/MergeTree/MergeTreeDataPartCompact.h index 8e279571578..c394de0d7c1 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -70,7 +70,7 @@ private: void loadIndexGranularity() override; /// Compact parts don't support per column size, only total size - void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; + void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const override; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index b3b6a0dded6..39f96ba06ad 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter( /// Takes into account the fact that several columns can e.g. share their .size substreams. /// When calculating totals these should be counted only once. ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( - const NameAndTypePair & column, std::unordered_set * processed_substreams) const + const NameAndTypePair & column, std::unordered_set * processed_substreams, std::optional columns_sample) const { ColumnSize size; if (checksums.empty()) @@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; - }, column.type, getColumnSample(column)); + }, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column)); return size; } @@ -374,12 +374,12 @@ std::optional MergeTreeDataPartWide::getFileNameForColumn(const NameAndT return filename; } -void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const +void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const { std::unordered_set processed_substreams; for (const auto & column : columns) { - ColumnSize size = getColumnSizeImpl(column, &processed_substreams); + ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample); each_columns_size[column.name] = size; total_size.add(size); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.h b/src/Storages/MergeTree/MergeTreeDataPartWide.h index 022a5fb746c..a6d4897ed87 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.h @@ -64,9 +64,9 @@ private: /// Loads marks index granularity into memory void loadIndexGranularity() override; - ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set * processed_substreams) const; + ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set * processed_substreams, std::optional columns_sample) const; - void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; + void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional columns_sample) const override; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 49d654c15e1..b22d58ba51e 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -123,6 +123,8 @@ public: written_offset_columns = written_offset_columns_; } + Block getColumnsSample() const override { return block_sample; } + protected: /// Count index_granularity for block and store in `index_granularity` size_t computeIndexGranularity(const Block & block) const; diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 77c34aae30a..604b2fda20a 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -207,7 +207,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); new_part->setBytesUncompressedOnDisk(checksums.getTotalSizeUncompressedOnDisk()); new_part->index_granularity = writer->getIndexGranularity(); - new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); + new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample()); /// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count From 0ff0c96b007108ab222a264e4a3bf8aa7cb7a18e Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 7 Nov 2024 20:01:40 +0000 Subject: [PATCH 4/5] Remove logging --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index f73b52dbafd..4e400fb1f94 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -2527,7 +2527,6 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co Columns result; result.resize(1); - LOG_DEBUG(getLogger("IMergeTreeDataPart"), "getColumnSample"); reader->readRows(0, total_mark, false, 0, result); return result[0]; } From 43456d74476c99ebdb25ae74a9e6f471421f3191 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov <48961922+Avogar@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:30:57 +0100 Subject: [PATCH 5/5] Restart CI --- .../0_stateless/03262_column_sizes_with_dynamic_structure.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql index 21e6515fc99..099bbd5dd22 100644 --- a/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql +++ b/tests/queries/0_stateless/03262_column_sizes_with_dynamic_structure.sql @@ -3,6 +3,7 @@ set allow_experimental_dynamic_type = 1; set allow_experimental_json_type = 1; + drop table if exists test; create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1; insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000);