Merge pull request #71526 from Avogar/fix-dynamic-sizes

Fix counting column size in wide part for Dynamic and JSON types
This commit is contained in:
Pavel Kruglov 2024-11-13 15:15:24 +00:00 committed by GitHub
commit f5c91bf809
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 50 additions and 20 deletions

View File

@ -2263,18 +2263,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad
proj_part->checkConsistency(require_part_metadata); proj_part->checkConsistency(require_part_metadata);
} }
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk() void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample)
{ {
calculateColumnsSizesOnDisk(); calculateColumnsSizesOnDisk(columns_sample);
calculateSecondaryIndicesSizesOnDisk(); calculateSecondaryIndicesSizesOnDisk();
} }
void IMergeTreeDataPart::calculateColumnsSizesOnDisk() void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional<Block> columns_sample)
{ {
if (getColumns().empty() || checksums.empty()) if (getColumns().empty() || checksums.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized");
calculateEachColumnSizes(columns_sizes, total_columns_size); calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample);
} }
void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
@ -2521,22 +2521,24 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr(); StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr); StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderSettings settings;
settings.can_read_part_without_marks = true;
MergeTreeReaderPtr reader = getReader( MergeTreeReaderPtr reader = getReader(
cols, cols,
storage_snapshot_ptr, storage_snapshot_ptr,
MarkRanges{MarkRange(0, 1)}, MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {}, /*virtual_fields=*/ {},
/*uncompressed_cache=*/{}, /*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(), storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(), std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{}, settings,
ValueSizeMap{}, ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{}); ReadBufferFromFileBase::ProfileCallback{});
Columns result; Columns result;
result.resize(1); result.resize(1);
reader->readRows(0, 1, false, 0, result); reader->readRows(0, total_mark, false, 0, result);
return result[0]; return result[0];
} }

View File

@ -428,7 +428,7 @@ public:
bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const; bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const;
/// Calculate column and secondary indices sizes on disk. /// Calculate column and secondary indices sizes on disk.
void calculateColumnsAndSecondaryIndicesSizesOnDisk(); void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const; std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
@ -633,7 +633,7 @@ protected:
/// Fill each_columns_size and total_size with sizes from columns files on /// Fill each_columns_size and total_size with sizes from columns files on
/// disk using columns and checksums. /// disk using columns and checksums.
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0; virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const = 0;
std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const; std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const;
@ -715,7 +715,7 @@ private:
void loadPartitionAndMinMaxIndex(); void loadPartitionAndMinMaxIndex();
void calculateColumnsSizesOnDisk(); void calculateColumnsSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
void calculateSecondaryIndicesSizesOnDisk(); void calculateSecondaryIndicesSizesOnDisk();

View File

@ -54,6 +54,8 @@ public:
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
virtual Block getColumnsSample() const = 0;
protected: protected:
SerializationPtr getSerialization(const String & column_name) const; SerializationPtr getSerialization(const String & column_name) const;

View File

@ -80,7 +80,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
} }
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional<Block> /*columns_sample*/) const
{ {
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
if (bin_checksum != checksums.files.end()) if (bin_checksum != checksums.files.end())

View File

@ -70,7 +70,7 @@ private:
void loadIndexGranularity() override; void loadIndexGranularity() override;
/// Compact parts don't support per column size, only total size /// Compact parts don't support per column size, only total size
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
}; };
} }

View File

@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
/// Takes into account the fact that several columns can e.g. share their .size substreams. /// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once. /// When calculating totals these should be counted only once.
ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const
{ {
ColumnSize size; ColumnSize size;
if (checksums.empty()) if (checksums.empty())
@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension()); auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end()) if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size; size.marks += mrk_checksum->second.file_size;
}); }, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column));
return size; return size;
} }
@ -374,12 +374,12 @@ std::optional<String> MergeTreeDataPartWide::getFileNameForColumn(const NameAndT
return filename; return filename;
} }
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const
{ {
std::unordered_set<String> processed_substreams; std::unordered_set<String> processed_substreams;
for (const auto & column : columns) for (const auto & column : columns)
{ {
ColumnSize size = getColumnSizeImpl(column, &processed_substreams); ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample);
each_columns_size[column.name] = size; each_columns_size[column.name] = size;
total_size.add(size); total_size.add(size);

View File

@ -64,9 +64,9 @@ private:
/// Loads marks index granularity into memory /// Loads marks index granularity into memory
void loadIndexGranularity() override; void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const; ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override; void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
}; };

View File

@ -123,6 +123,8 @@ public:
written_offset_columns = written_offset_columns_; written_offset_columns = written_offset_columns_;
} }
Block getColumnsSample() const override { return block_sample; }
protected: protected:
/// Count index_granularity for block and store in `index_granularity` /// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block) const; size_t computeIndexGranularity(const Block & block) const;

View File

@ -172,7 +172,7 @@ size_t MergeTreeReaderWide::readRows(
throw; throw;
} }
if (column->empty()) if (column->empty() && max_rows_to_read > 0)
res_columns[pos] = nullptr; res_columns[pos] = nullptr;
} }

View File

@ -209,7 +209,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity(); new_part->index_granularity = writer->getIndexGranularity();
/// Just in case /// Just in case
new_part->index_granularity.shrinkToFitInMemory(); new_part->index_granularity.shrinkToFitInMemory();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
/// In mutation, existing_rows_count is already calculated in PartMergerWriter /// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count /// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count

View File

@ -0,0 +1 @@
test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB

View File

@ -0,0 +1,23 @@
-- Tags: no-random-settings
set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1;
drop table if exists test;
create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1;
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000);
SELECT
`table`,
formatReadableQuantity(sum(rows)) AS rows,
formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed,
formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed,
formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk
FROM system.parts
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
GROUP BY `table`
ORDER BY `table` ASC;
drop table test;