From 6bc814a860cbf7fa69e29d847279aca481da1ee7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 04:47:32 +0200 Subject: [PATCH 1/5] Renaming --- .../MergeTreeDataPartWriterCompact.cpp | 18 +++++++++--------- .../MergeTree/MergeTreeDataPartWriterCompact.h | 2 +- .../MergeTreeDataPartWriterOnDisk.cpp | 12 ++++++------ .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 2 +- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 6 +++--- .../MergeTree/MergeTreeMarksLoader.cpp | 6 +++--- src/Storages/MergeTree/MergeTreeMarksLoader.h | 2 +- 7 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 8c47c99a49a..bfccb3f8709 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -34,7 +34,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( , marks_hashing(*marks_file) , marks_compressed_buf(marks_hashing, settings_.getMarksCompressionCodec(), settings_.marks_compress_block_size) , marks_compressed(marks_compressed_buf) - , is_compress_marks(isCompressedFromMrkExtension(marks_file_extension)) + , compress_marks(isCompressedFromMrkExtension(marks_file_extension)) { const auto & storage_columns = metadata_snapshot->getColumns(); for (const auto & column : columns_list) @@ -207,8 +207,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G }; - writeIntBinary(plain_hashing.count(), is_compress_marks ? marks_compressed : marks_hashing); - writeIntBinary(static_cast(0), is_compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); writeColumnSingleGranule( block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name), @@ -218,7 +218,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G prev_stream->hashing_buf.next(); //-V522 } - writeIntBinary(granule.rows_to_write, is_compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(granule.rows_to_write, compress_marks ? marks_compressed : marks_hashing); } } @@ -247,14 +247,14 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check { for (size_t i = 0; i < columns_list.size(); ++i) { - writeIntBinary(plain_hashing.count(), is_compress_marks ? marks_compressed : marks_hashing); - writeIntBinary(static_cast(0), is_compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); } - writeIntBinary(static_cast(0), is_compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); } plain_file->next(); - if (is_compress_marks) + if (compress_marks) marks_compressed.next(); marks_hashing.next(); @@ -339,7 +339,7 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums & checksums.files[data_file_name].file_size = plain_hashing.count(); checksums.files[data_file_name].file_hash = plain_hashing.getHash(); - if (is_compress_marks) + if (compress_marks) { checksums.files[marks_file_name].is_compressed = true; checksums.files[marks_file_name].uncompressed_size = marks_compressed.count(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index d641730e781..2cf874b37d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -89,7 +89,7 @@ private: HashingWriteBuffer marks_hashing; CompressedWriteBuffer marks_compressed_buf; HashingWriteBuffer marks_compressed; - bool is_compress_marks; + bool compress_marks; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index a4810957ef6..083393747df 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -17,7 +17,7 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() /// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually. plain_hashing.next(); - if (is_compress_marks) + if (compress_marks) marks_compressed.next(); marks_hashing.next(); @@ -66,7 +66,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( marks_hashing(*marks_file), marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_), marks_compressed(marks_compressed_buf), - is_compress_marks(isCompressedFromMrkExtension(marks_file_extension)) + compress_marks(isCompressedFromMrkExtension(marks_file_extension)) { } @@ -80,7 +80,7 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); - if (is_compress_marks) + if (compress_marks) { checksums.files[name + marks_file_extension].is_compressed = true; checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count(); @@ -275,12 +275,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block if (stream.compressed.offset() >= settings.min_compress_block_size) stream.compressed.next(); - writeIntBinary(stream.plain_hashing.count(), stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); - writeIntBinary(stream.compressed.offset(), stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream.plain_hashing.count(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream.compressed.offset(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); /// Actually this numbers is redundant, but we have to store them /// to be compatible with normal .mrk2 file format if (settings.can_use_adaptive_granularity) - writeIntBinary(1UL, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(1UL, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); } size_t pos = granule.start_row; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index fe451239443..4c3f263cd45 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -76,7 +76,7 @@ public: HashingWriteBuffer marks_hashing; CompressedWriteBuffer marks_compressed_buf; HashingWriteBuffer marks_compressed; - bool is_compress_marks; + bool compress_marks; bool is_prefinalized = false; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 68d9c7f0ff3..a93968fb7bf 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -272,10 +272,10 @@ void MergeTreeDataPartWriterWide::writeSingleMark( void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) { Stream & stream = *column_streams[stream_with_mark.stream_name]; - writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); - writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); if (settings.can_use_adaptive_granularity) - writeIntBinary(rows_in_mark, stream.is_compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(rows_in_mark, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); } StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 86a0fd722de..e0b4d51ecf1 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -29,7 +29,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader( : data_part_storage(std::move(data_part_storage_)) , mark_cache(mark_cache_) , mrk_path(mrk_path_) - , is_compress_marks(isCompressedFromMrkExtension(fs::path(mrk_path_).extension())) + , compress_marks(isCompressedFromMrkExtension(fs::path(mrk_path_).extension())) , marks_count(marks_count_) , index_granularity_info(index_granularity_info_) , save_marks_in_cache(save_marks_in_cache_) @@ -64,7 +64,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() auto res = std::make_shared(marks_count * columns_in_mark); - if (!is_compress_marks && expected_uncompressed_size != file_size) + if (!compress_marks && expected_uncompressed_size != file_size) throw Exception( ErrorCodes::CORRUPTED_DATA, "Bad size of marks file '{}': {}, must be: {}", @@ -73,7 +73,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt); std::unique_ptr reader; - if (!is_compress_marks) + if (!compress_marks) reader = std::move(buffer); else reader = std::make_unique(std::move(buffer)); diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.h b/src/Storages/MergeTree/MergeTreeMarksLoader.h index 3c591b702c3..c8261e021d4 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -31,7 +31,7 @@ private: DataPartStoragePtr data_part_storage; MarkCache * mark_cache = nullptr; String mrk_path; - bool is_compress_marks; + bool compress_marks; size_t marks_count; const MergeTreeIndexGranularityInfo & index_granularity_info; bool save_marks_in_cache = false; From b4eec0e6f4cc82b814c14549008a9abfde64d2b8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 06:31:19 +0200 Subject: [PATCH 2/5] Fix strange code --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 74 +++++++++++++++-- src/Storages/MergeTree/IMergeTreeDataPart.h | 21 ++++- .../MergeTree/IPartMetadataManager.cpp | 4 +- src/Storages/MergeTree/MergeTreeData.cpp | 20 +---- .../MergeTreeDataPartWriterCompact.cpp | 60 +++++++++----- .../MergeTreeDataPartWriterCompact.h | 16 ++-- .../MergeTreeDataPartWriterOnDisk.cpp | 79 ++++++++++--------- .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 18 ++--- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 23 +++--- .../MergeTreeIndexGranularityInfo.cpp | 15 +--- .../MergeTree/MergeTreeIndexGranularityInfo.h | 5 +- .../MergeTree/MergeTreeMarksLoader.cpp | 8 +- src/Storages/MergeTree/MergeTreeMarksLoader.h | 1 - 13 files changed, 219 insertions(+), 125 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index fbcc2bb2a50..fb1f2672cac 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -65,8 +65,10 @@ namespace ErrorCodes extern const int BAD_TTL_FILE; extern const int NOT_IMPLEMENTED; extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int INCORRECT_FILE_NAME; } + void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager) { auto metadata_snapshot = data.getInMemoryMetadataPtr(); @@ -1807,12 +1809,74 @@ bool isCompressedFromIndexExtension(const String & index_extension) return index_extension == getIndexExtension(true); } -bool isCompressedFromMrkExtension(const String & mrk_extension) +MarkType::MarkType(std::string_view extension) { - return mrk_extension == getNonAdaptiveMrkExtension(true) - || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) - || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true) - || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::InMemory, true); + if (extension.starts_with('c')) + { + compressed = true; + extension = extension.substr(1); + } + + if (!extension.starts_with("mrk")) + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); + + extension = extension.substr(strlen("mrk")); + + if (extension.empty()) + { + adaptive = false; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "2") + { + adaptive = true; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "3") + { + adaptive = true; + part_type = MergeTreeDataPartType::Compact; + } + else + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension); +} + +MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_) + : adaptive(adaptive_), compressed(compressed_), part_type(part_type_) +{ + if (!adaptive && part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + if (part_type == MergeTreeDataPartType::Unknown) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); +} + +bool MarkType::isMarkFileExtension(std::string_view extension) +{ + return extension.find("mrk") != std::string_view::npos; +} + +std::string MarkType::getFileExtension() +{ + std::string res = compressed ? "cmrk" : "mrk"; + + if (!adaptive) + { + if (part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + return res; + } + + switch (part_type) + { + case MergeTreeDataPartType::Wide: + return res + "2"; + case MergeTreeDataPartType::Compact: + return res + "3"; + case MergeTreeDataPartType::InMemory: + return ""; + case MergeTreeDataPartType::Unknown: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); + } } } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 0bda9a8f3a7..c5befb76def 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,7 @@ #include + namespace zkutil { class ZooKeeper; @@ -585,6 +587,23 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part); inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; } std::optional getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); bool isCompressedFromIndexExtension(const String & index_extension); -bool isCompressedFromMrkExtension(const String & mrk_extension); + + +/** Various types of mark files are stored in files with various extensions: + * .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3. + * This helper allows to obtain mark type from file extension and vise versa. + */ +struct MarkType +{ + MarkType(std::string_view extension); + MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_); + + static bool isMarkFileExtension(std::string_view extension); + std::string getFileExtension(); + + bool adaptive = false; + bool compressed = false; + MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown; +}; } diff --git a/src/Storages/MergeTree/IPartMetadataManager.cpp b/src/Storages/MergeTree/IPartMetadataManager.cpp index 10b5aa2a9f5..8aecb3cd7d4 100644 --- a/src/Storages/MergeTree/IPartMetadataManager.cpp +++ b/src/Storages/MergeTree/IPartMetadataManager.cpp @@ -13,8 +13,8 @@ IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : p bool IPartMetadataManager::isCompressedFromFileName(const String & file_name) { - const auto & extension = fs::path(file_name).extension(); - return isCompressedFromMrkExtension(extension) || isCompressedFromIndexExtension(extension); + std::string extension = fs::path(file_name).extension(); + return MarkType(extension).compressed || isCompressedFromIndexExtension(extension); } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5d06a4d1234..c166353b486 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -154,6 +154,7 @@ namespace ErrorCodes extern const int CANNOT_RESTORE_TABLE; } + static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool allow_sampling_expression_not_in_primary_key, bool check_sample_column_is_correct) { if (metadata.sampling_key.column_names.empty()) @@ -2642,21 +2643,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, throw Exception("Unknown type of part " + data_part_storage->getRelativePath(), ErrorCodes::UNKNOWN_PART_TYPE); } -static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext) -{ - if (mrk_ext == getNonAdaptiveMrkExtension(true) - || mrk_ext == getNonAdaptiveMrkExtension(false)) - return MergeTreeDataPartType::Wide; - if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) - || mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false)) - return MergeTreeDataPartType::Wide; - if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true) - || mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false)) - return MergeTreeDataPartType::Compact; - - throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE); -} - MergeTreeData::MutableDataPartPtr MergeTreeData::createPart( const String & name, const DataPartStoragePtr & data_part_storage, const IMergeTreeDataPart * parent_part) const { @@ -2671,7 +2657,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart( auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage); if (mrk_ext) - type = getPartTypeFromMarkExtension(*mrk_ext); + { + type = MarkType(*mrk_ext).part_type; + } else { /// Didn't find any mark file, suppose that part is empty. diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index bfccb3f8709..03a630d2c80 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -27,15 +27,24 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( settings.max_compress_block_size, settings_.query_write_settings)) , plain_hashing(*plain_file) - , marks_file(data_part_storage_builder->writeFile( +{ + marks_file = data_part_storage_builder->writeFile( MergeTreeDataPartCompact::DATA_FILE_NAME + marks_file_extension_, 4096, - settings_.query_write_settings)) - , marks_hashing(*marks_file) - , marks_compressed_buf(marks_hashing, settings_.getMarksCompressionCodec(), settings_.marks_compress_block_size) - , marks_compressed(marks_compressed_buf) - , compress_marks(isCompressedFromMrkExtension(marks_file_extension)) -{ + settings_.query_write_settings); + + marks_file_hashing = std::make_unique(*marks_file); + + if (MarkType(marks_file_extension).compressed) + { + marks_compressor = std::make_unique( + *marks_file_hashing, + settings_.getMarksCompressionCodec(), + settings_.marks_compress_block_size); + + marks_source_hashing = std::make_unique(*marks_compressor); + } + const auto & storage_columns = metadata_snapshot->getColumns(); for (const auto & column : columns_list) addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec)); @@ -176,6 +185,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlockPrimaryIndexAndSkipIndices(co void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const Granules & granules) { + WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing; + for (const auto & granule : granules) { data_written = true; @@ -207,8 +218,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G }; - writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing); - writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(plain_hashing.count(), marks_out); + writeIntBinary(static_cast(0), marks_out); writeColumnSingleGranule( block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name), @@ -218,7 +229,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G prev_stream->hashing_buf.next(); //-V522 } - writeIntBinary(granule.rows_to_write, compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(granule.rows_to_write, marks_out); } } @@ -243,21 +254,26 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check assert(stream->hashing_buf.offset() == 0); #endif + WriteBuffer & marks_out = marks_source_hashing ? *marks_source_hashing : *marks_file_hashing; + if (with_final_mark && data_written) { for (size_t i = 0; i < columns_list.size(); ++i) { - writeIntBinary(plain_hashing.count(), compress_marks ? marks_compressed : marks_hashing); - writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(plain_hashing.count(), marks_out); + writeIntBinary(static_cast(0), marks_out); } - writeIntBinary(static_cast(0), compress_marks ? marks_compressed : marks_hashing); + writeIntBinary(static_cast(0), marks_out); } plain_file->next(); - if (compress_marks) - marks_compressed.next(); - marks_hashing.next(); + if (marks_source_hashing) + marks_source_hashing->next(); + if (marks_compressor) + marks_compressor->next(); + + marks_file_hashing->next(); addToChecksums(checksums); plain_file->preFinalize(); @@ -268,6 +284,7 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync) { plain_file->finalize(); marks_file->finalize(); + if (sync) { plain_file->sync(); @@ -339,14 +356,15 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums & checksums.files[data_file_name].file_size = plain_hashing.count(); checksums.files[data_file_name].file_hash = plain_hashing.getHash(); - if (compress_marks) + if (marks_compressor) { checksums.files[marks_file_name].is_compressed = true; - checksums.files[marks_file_name].uncompressed_size = marks_compressed.count(); - checksums.files[marks_file_name].uncompressed_hash = marks_compressed.getHash(); + checksums.files[marks_file_name].uncompressed_size = marks_source_hashing->count(); + checksums.files[marks_file_name].uncompressed_hash = marks_source_hashing->getHash(); } - checksums.files[marks_file_name].file_size = marks_hashing.count(); - checksums.files[marks_file_name].file_hash = marks_hashing.getHash(); + + checksums.files[marks_file_name].file_size = marks_file_hashing->count(); + checksums.files[marks_file_name].file_hash = marks_file_hashing->getHash(); } void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 2cf874b37d2..7b68f61925f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -1,6 +1,8 @@ #pragma once + #include + namespace DB { @@ -84,12 +86,16 @@ private: /// Stream for each column's substreams path (look at addStreams). std::unordered_map compressed_streams; - /// marks -> marks_file -> marks_compressed_buf -> marks_compressed + /// If marks are uncompressed, the data is written to 'marks_file_hashing' for hash calculation and then to the 'marks_file'. std::unique_ptr marks_file; - HashingWriteBuffer marks_hashing; - CompressedWriteBuffer marks_compressed_buf; - HashingWriteBuffer marks_compressed; - bool compress_marks; + std::unique_ptr marks_file_hashing; + + /// If marks are compressed, the data is written to 'marks_source_hashing' for hash calculation, + /// then to 'marks_compressor' for compression, + /// then to 'marks_file_hashing' for calculation of hash of compressed data, + /// then finally to 'marks_file'. + std::unique_ptr marks_compressor; + std::unique_ptr marks_source_hashing; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 083393747df..1d2b095330e 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -13,12 +13,15 @@ namespace ErrorCodes void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() { - compressed.next(); - /// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually. + compressed_hashing.next(); + compressor.next(); plain_hashing.next(); if (compress_marks) - marks_compressed.next(); + { + marks_compressed_hashing.next(); + marks_compressor.next(); + } marks_hashing.next(); @@ -60,13 +63,13 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( marks_file_extension{marks_file_extension_}, plain_file(data_part_storage_builder->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_), - compressed(compressed_buf), + compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressed_hashing(compressor), marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks_hashing(*marks_file), - marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_), - marks_compressed(marks_compressed_buf), - compress_marks(isCompressedFromMrkExtension(marks_file_extension)) + marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_), + marks_compressed_hashing(marks_compressor), + compress_marks(MarkType(marks_file_extension).compressed) { } @@ -75,17 +78,18 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa String name = escaped_column_name; checksums.files[name + data_file_extension].is_compressed = true; - checksums.files[name + data_file_extension].uncompressed_size = compressed.count(); - checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash(); + checksums.files[name + data_file_extension].uncompressed_size = compressed_hashing.count(); + checksums.files[name + data_file_extension].uncompressed_hash = compressed_hashing.getHash(); checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); if (compress_marks) { checksums.files[name + marks_file_extension].is_compressed = true; - checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count(); - checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed.getHash(); + checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count(); + checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash(); } + checksums.files[name + marks_file_extension].file_size = marks_hashing.count(); checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash(); } @@ -175,15 +179,15 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex() { String index_name = "primary" + getIndexExtension(compress_primary_key); index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings); - index_hashing_stream = std::make_unique(*index_file_stream); + index_file_hashing_stream = std::make_unique(*index_file_stream); if (compress_primary_key) { ParserCodec codec_parser; auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr); - index_compressed_buf = std::make_unique(*index_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size); - index_compressed_stream = std::make_unique(*index_compressed_buf); + index_compressor_stream = std::make_unique(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size); + index_source_hashing_stream = std::make_unique(*index_compressor_stream); } } } @@ -242,7 +246,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc const auto & primary_column = primary_index_block.getByPosition(j); index_columns[j]->insertFrom(*primary_column.column, granule.start_row); primary_column.type->getDefaultSerialization()->serializeBinary( - *primary_column.column, granule.start_row, compress_primary_key ? *index_compressed_stream : *index_hashing_stream); + *primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream); } } } @@ -260,11 +264,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block { const auto index_helper = skip_indices[i]; auto & stream = *skip_indices_streams[i]; + WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing; + for (const auto & granule : granules_to_write) { if (skip_index_accumulated_marks[i] == index_helper->index.granularity) { - skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); + skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing); skip_index_accumulated_marks[i] = 0; } @@ -272,15 +278,16 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block { skip_indices_aggregators[i] = index_helper->createIndexAggregator(); - if (stream.compressed.offset() >= settings.min_compress_block_size) - stream.compressed.next(); + if (stream.compressed_hashing.offset() >= settings.min_compress_block_size) + stream.compressed_hashing.next(); + + writeIntBinary(stream.plain_hashing.count(), marks_out); + writeIntBinary(stream.compressed_hashing.offset(), marks_out); - writeIntBinary(stream.plain_hashing.count(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); - writeIntBinary(stream.compressed.offset(), stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); /// Actually this numbers is redundant, but we have to store them - /// to be compatible with normal .mrk2 file format + /// to be compatible with the normal .mrk2 file format if (settings.can_use_adaptive_granularity) - writeIntBinary(1UL, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(1UL, marks_out); } size_t pos = granule.start_row; @@ -297,7 +304,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat if (write_final_mark && compute_granularity) index_granularity.appendMark(0); - if (index_hashing_stream) + if (index_file_hashing_stream) { if (write_final_mark) { @@ -307,32 +314,32 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat size_t last_row_number = column.size() - 1; index_columns[j]->insertFrom(column, last_row_number); index_types[j]->getDefaultSerialization()->serializeBinary( - column, last_row_number, compress_primary_key ? *index_compressed_stream : *index_hashing_stream); + column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream); } last_block_index_columns.clear(); } if (compress_primary_key) - index_compressed_stream->next(); + index_source_hashing_stream->next(); - index_hashing_stream->next(); + index_file_hashing_stream->next(); String index_name = "primary" + getIndexExtension(compress_primary_key); if (compress_primary_key) { checksums.files[index_name].is_compressed = true; - checksums.files[index_name].uncompressed_size = index_compressed_stream->count(); - checksums.files[index_name].uncompressed_hash = index_compressed_stream->getHash(); + checksums.files[index_name].uncompressed_size = index_source_hashing_stream->count(); + checksums.files[index_name].uncompressed_hash = index_source_hashing_stream->getHash(); } - checksums.files[index_name].file_size = index_hashing_stream->count(); - checksums.files[index_name].file_hash = index_hashing_stream->getHash(); + checksums.files[index_name].file_size = index_file_hashing_stream->count(); + checksums.files[index_name].file_hash = index_file_hashing_stream->getHash(); index_file_stream->preFinalize(); } } void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync) { - if (index_hashing_stream) + if (index_file_hashing_stream) { index_file_stream->finalize(); if (sync) @@ -340,10 +347,10 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync) if (compress_primary_key) { - index_compressed_stream = nullptr; - index_compressed_buf = nullptr; + index_source_hashing_stream = nullptr; + index_compressor_stream = nullptr; } - index_hashing_stream = nullptr; + index_file_hashing_stream = nullptr; } } @@ -353,7 +360,7 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data { auto & stream = *skip_indices_streams[i]; if (!skip_indices_aggregators[i]->empty()) - skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); + skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed_hashing); } for (auto & stream : skip_indices_streams) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 4c3f263cd45..4b58224de78 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -65,17 +65,17 @@ public: std::string data_file_extension; std::string marks_file_extension; - /// compressed -> compressed_buf -> plain_hashing -> plain_file + /// compressed_hashing -> compressor -> plain_hashing -> plain_file std::unique_ptr plain_file; HashingWriteBuffer plain_hashing; - CompressedWriteBuffer compressed_buf; - HashingWriteBuffer compressed; + CompressedWriteBuffer compressor; + HashingWriteBuffer compressed_hashing; - /// marks -> marks_file -> marks_compressed_buf -> marks_compressed + /// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file std::unique_ptr marks_file; HashingWriteBuffer marks_hashing; - CompressedWriteBuffer marks_compressed_buf; - HashingWriteBuffer marks_compressed; + CompressedWriteBuffer marks_compressor; + HashingWriteBuffer marks_compressed_hashing; bool compress_marks; bool is_prefinalized = false; @@ -145,9 +145,9 @@ protected: std::vector skip_index_accumulated_marks; std::unique_ptr index_file_stream; - std::unique_ptr index_hashing_stream; - std::unique_ptr index_compressed_buf; - std::unique_ptr index_compressed_stream; + std::unique_ptr index_file_hashing_stream; + std::unique_ptr index_compressor_stream; + std::unique_ptr index_source_hashing_stream; bool compress_primary_key; DataTypes index_types; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index a93968fb7bf..14ff073a511 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -30,6 +30,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, Granules result; size_t current_row = 0; + /// When our last mark is not finished yet and we have to write rows into it if (rows_written_in_last_mark > 0) { @@ -43,7 +44,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, .is_complete = (rows_left_in_block >= rows_left_in_last_mark), }); current_row += result.back().rows_to_write; - current_mark++; + ++current_mark; } /// Calculating normal granules for block @@ -61,7 +62,7 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, .is_complete = (rows_left_in_block >= expected_rows_in_mark), }); current_row += result.back().rows_to_write; - current_mark++; + ++current_mark; } return result; @@ -144,7 +145,7 @@ ISerialization::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGett if (is_offsets && offset_columns.contains(stream_name)) return nullptr; - return &column_streams.at(stream_name)->compressed; + return &column_streams.at(stream_name)->compressed_hashing; }; } @@ -272,10 +273,12 @@ void MergeTreeDataPartWriterWide::writeSingleMark( void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) { Stream & stream = *column_streams[stream_with_mark.stream_name]; - writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); - writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); + WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing; + + writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out); + writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, marks_out); if (settings.can_use_adaptive_granularity) - writeIntBinary(rows_in_mark, stream.compress_marks ? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(rows_in_mark, marks_out); } StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( @@ -297,13 +300,13 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( Stream & stream = *column_streams[stream_name]; /// There could already be enough data to compress into the new block. - if (stream.compressed.offset() >= settings.min_compress_block_size) - stream.compressed.next(); + if (stream.compressed_hashing.offset() >= settings.min_compress_block_size) + stream.compressed_hashing.next(); StreamNameAndMark stream_with_mark; stream_with_mark.stream_name = stream_name; stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count(); - stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset(); + stream_with_mark.mark.offset_in_decompressed_block = stream.compressed_hashing.offset(); result.push_back(stream_with_mark); }, path); @@ -333,7 +336,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( if (is_offsets && offset_columns.contains(stream_name)) return; - column_streams[stream_name]->compressed.nextIfAtEnd(); + column_streams[stream_name]->compressed_hashing.nextIfAtEnd(); }, serialize_settings.path); } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index a07e2835a43..39d3f2d7d82 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -16,19 +16,10 @@ namespace ErrorCodes std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage) { if (data_part_storage->exists()) - { for (auto it = data_part_storage->iterate(); it->isValid(); it->next()) - { - const auto & ext = fs::path(it->name()).extension(); - if (ext == getNonAdaptiveMrkExtension(false) - || ext == getNonAdaptiveMrkExtension(true) - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false) - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false) - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)) - return ext; - } - } + if (it->isFile()) + if (std::string ext = fs::path(it->name()).extension(); MarkType::isMarkFileExtension(ext)) + return ext; return {}; } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index 19843f58746..0bfd404c158 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -42,10 +42,7 @@ public: String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - if (mrk_ext) - return path_prefix + *mrk_ext; - - return path_prefix + marks_file_extension; + return path_prefix + mrk_ext.value_or(marks_file_extension); } size_t getMarkSizeInBytes(size_t columns_num = 1) const; diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index e0b4d51ecf1..3d3532ff8d0 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -29,7 +29,6 @@ MergeTreeMarksLoader::MergeTreeMarksLoader( : data_part_storage(std::move(data_part_storage_)) , mark_cache(mark_cache_) , mrk_path(mrk_path_) - , compress_marks(isCompressedFromMrkExtension(fs::path(mrk_path_).extension())) , marks_count(marks_count_) , index_granularity_info(index_granularity_info_) , save_marks_in_cache(save_marks_in_cache_) @@ -62,9 +61,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); size_t expected_uncompressed_size = mark_size * marks_count; + std::string file_extension = fs::path(mrk_path).extension(); + bool compressed_marks = MarkType(file_extension).compressed; + auto res = std::make_shared(marks_count * columns_in_mark); - if (!compress_marks && expected_uncompressed_size != file_size) + if (!compressed_marks && expected_uncompressed_size != file_size) throw Exception( ErrorCodes::CORRUPTED_DATA, "Bad size of marks file '{}': {}, must be: {}", @@ -73,7 +75,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt); std::unique_ptr reader; - if (!compress_marks) + if (!compressed_marks) reader = std::move(buffer); else reader = std::make_unique(std::move(buffer)); diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.h b/src/Storages/MergeTree/MergeTreeMarksLoader.h index c8261e021d4..98323fbe6b5 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -31,7 +31,6 @@ private: DataPartStoragePtr data_part_storage; MarkCache * mark_cache = nullptr; String mrk_path; - bool compress_marks; size_t marks_count; const MergeTreeIndexGranularityInfo & index_granularity_info; bool save_marks_in_cache = false; From d7127e4b2db6def090931a3cf9dc59cd9d90a9ab Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 07:26:58 +0200 Subject: [PATCH 3/5] Make it slightly more sane --- .../QueryPlan/ReadFromMergeTree.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 73 +--------- src/Storages/MergeTree/IMergeTreeDataPart.h | 20 +-- .../MergeTree/IMergedBlockOutputStream.cpp | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 6 +- .../MergeTree/MergeTreeDataPartCompact.cpp | 12 +- .../MergeTree/MergeTreeDataPartWide.cpp | 70 +++++---- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 2 +- .../MergeTreeIndexGranularityInfo.cpp | 134 ++++++++++++------ .../MergeTree/MergeTreeIndexGranularityInfo.h | 40 +++--- .../MergeTree/MergeTreeMarksLoader.cpp | 2 +- .../MergeTree/MergedBlockOutputStream.cpp | 2 +- .../MergedColumnOnlyOutputStream.cpp | 4 +- src/Storages/MergeTree/MutateTask.cpp | 2 +- .../System/StorageSystemPartsColumns.cpp | 2 +- 15 files changed, 162 insertions(+), 210 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 1f6c6ee2a3f..a6100fe42c4 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -341,7 +341,7 @@ struct PartRangesReadInfo sum_marks_in_parts[i] = parts[i].getMarksCount(); sum_marks += sum_marks_in_parts[i]; - if (parts[i].data_part->index_granularity_info.is_adaptive) + if (parts[i].data_part->index_granularity_info.mark_type.adaptive) ++adaptive_parts; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index fb1f2672cac..d61b23c67a2 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -65,7 +65,6 @@ namespace ErrorCodes extern const int BAD_TTL_FILE; extern const int NOT_IMPLEMENTED; extern const int NO_SUCH_COLUMN_IN_TABLE; - extern const int INCORRECT_FILE_NAME; } @@ -1629,7 +1628,7 @@ void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() auto index_name_escaped = escapeForFileName(index_name); auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension(); - auto index_marks_file_name = index_name_escaped + index_granularity_info.marks_file_extension; + auto index_marks_file_name = index_name_escaped + getMarksFileExtension(); /// If part does not contain index auto bin_checksum = checksums.files.find(index_file_name); @@ -1809,74 +1808,4 @@ bool isCompressedFromIndexExtension(const String & index_extension) return index_extension == getIndexExtension(true); } -MarkType::MarkType(std::string_view extension) -{ - if (extension.starts_with('c')) - { - compressed = true; - extension = extension.substr(1); - } - - if (!extension.starts_with("mrk")) - throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); - - extension = extension.substr(strlen("mrk")); - - if (extension.empty()) - { - adaptive = false; - part_type = MergeTreeDataPartType::Wide; - } - else if (extension == "2") - { - adaptive = true; - part_type = MergeTreeDataPartType::Wide; - } - else if (extension == "3") - { - adaptive = true; - part_type = MergeTreeDataPartType::Compact; - } - else - throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension); -} - -MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_) - : adaptive(adaptive_), compressed(compressed_), part_type(part_type_) -{ - if (!adaptive && part_type != MergeTreeDataPartType::Wide) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); - if (part_type == MergeTreeDataPartType::Unknown) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); -} - -bool MarkType::isMarkFileExtension(std::string_view extension) -{ - return extension.find("mrk") != std::string_view::npos; -} - -std::string MarkType::getFileExtension() -{ - std::string res = compressed ? "cmrk" : "mrk"; - - if (!adaptive) - { - if (part_type != MergeTreeDataPartType::Wide) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); - return res; - } - - switch (part_type) - { - case MergeTreeDataPartType::Wide: - return res + "2"; - case MergeTreeDataPartType::Compact: - return res + "3"; - case MergeTreeDataPartType::InMemory: - return ""; - case MergeTreeDataPartType::Unknown: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); - } -} - } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index c5befb76def..a0f20a4f09d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -160,7 +160,7 @@ public: void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency); void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const; - String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; } + String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); } /// Generate the new name for this part according to `new_part_info` and min/max dates from the old name. /// This is useful when you want to change e.g. block numbers or the mutation version of the part. @@ -588,22 +588,4 @@ inline String getIndexExtension(bool is_compressed_primary_key) { return is_comp std::optional getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); bool isCompressedFromIndexExtension(const String & index_extension); - -/** Various types of mark files are stored in files with various extensions: - * .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3. - * This helper allows to obtain mark type from file extension and vise versa. - */ -struct MarkType -{ - MarkType(std::string_view extension); - MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_); - - static bool isMarkFileExtension(std::string_view extension); - std::string getFileExtension(); - - bool adaptive = false; - bool compressed = false; - MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown; -}; - } diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 9be49a9bba4..5af9bbd3ed8 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -106,6 +106,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( if (remove_it != columns.end()) columns.erase(remove_it); } + return remove_files; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c166353b486..26241922512 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1072,7 +1072,7 @@ void MergeTreeData::loadDataPartsFromDisk( suspicious_broken_parts_bytes += size_of_part; return; } - if (!part->index_granularity_info.is_adaptive) + if (!part->index_granularity_info.mark_type.adaptive) has_non_adaptive_parts.store(true, std::memory_order_relaxed); else has_adaptive_parts.store(true, std::memory_order_relaxed); @@ -6315,9 +6315,9 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0) { - if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive) + if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.mark_type.adaptive) return false; - if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive) + if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive) return false; } return true; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 046a7d274c0..20f47d97d61 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -71,7 +71,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter( return std::make_unique( shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot, - indices_to_recalc, index_granularity_info.marks_file_extension, + indices_to_recalc, getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } @@ -85,19 +85,17 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac total_size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) total_size.marks += mrk_checksum->second.file_size; } void MergeTreeDataPartCompact::loadIndexGranularity() { - //String full_path = getRelativePath(); - if (columns.empty()) throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.mark_type.adaptive) throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED); auto marks_file_path = index_granularity_info.getMarksFilePath("data"); @@ -131,7 +129,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co return false; auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); - auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension()); return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end()); } @@ -139,7 +137,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const { checkConsistencyBase(); - String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension; + String mrk_file_name = DATA_FILE_NAME + getMarksFileExtension(); if (!checksums.empty()) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index c3ad0f0e7d1..e71059150f9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -67,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter( return std::make_unique( shared_from_this(), data_part_storage_builder, columns_list, metadata_snapshot, indices_to_recalc, - index_granularity_info.marks_file_extension, + getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } @@ -95,7 +95,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; }); @@ -103,11 +103,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( return size; } + void MergeTreeDataPartWide::loadIndexGranularity() { index_granularity_info.changeGranularityIfRequired(data_part_storage); - if (columns.empty()) throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); @@ -119,55 +119,48 @@ void MergeTreeDataPartWide::loadIndexGranularity() std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path)); size_t marks_file_size = data_part_storage->getFileSize(marks_file_path); - if (!index_granularity_info.compress_marks) - { - if (!index_granularity_info.is_adaptive) - { - size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); - index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same - } - else - { - auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); - while (!buffer->eof()) - { - buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block - size_t granularity; - readIntBinary(granularity, *buffer); - index_granularity.appendMark(granularity); - } - if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size) - throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA); - } + if (!index_granularity_info.mark_type.adaptive && !index_granularity_info.mark_type.compressed) + { + /// The most easy way - no need to read the file, everything is known from its size. + size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same } else { - CompressedReadBufferFromFile buffer( - data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt)); + auto marks_file = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); + + std::unique_ptr marks_reader; + if (!index_granularity_info.mark_type.compressed) + marks_reader = std::move(marks_file); + else + marks_reader = std::make_unique(std::move(marks_file)); - MarksInCompressedFile mark(1); size_t marks_count = 0; - while (!buffer.eof()) + while (!marks_reader->eof()) { - buffer.readStrict(reinterpret_cast(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block + MarkInCompressedFile mark; + size_t granularity; + + readBinary(mark.offset_in_compressed_file, *marks_reader); + readBinary(mark.offset_in_decompressed_block, *marks_reader); ++marks_count; - if (index_granularity_info.is_adaptive) + if (index_granularity_info.mark_type.adaptive) { - size_t granularity; - readIntBinary(granularity, buffer); + readIntBinary(granularity, *marks_reader); index_granularity.appendMark(granularity); } } - if (!index_granularity_info.is_adaptive) - index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); + if (!index_granularity_info.mark_type.adaptive) + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same } index_granularity.setInitialized(); } + bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const { return data_part_storage->isStoredOnRemoteDisk(); @@ -186,7 +179,7 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide() void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const { checkConsistencyBase(); - //String path = getRelativePath(); + std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension(); if (!checksums.empty()) { @@ -197,7 +190,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { String file_name = ISerialization::getFileNameForStream(name_type, substream_path); - String mrk_file_name = file_name + index_granularity_info.marks_file_extension; + String mrk_file_name = file_name + marks_file_extension; String bin_file_name = file_name + DATA_FILE_EXTENSION; if (!checksums.files.contains(mrk_file_name)) @@ -223,7 +216,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const { getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension; + auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension; /// Missing file is Ok for case when new column was added. if (data_part_storage->exists(file_path)) @@ -251,10 +244,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const { - auto check_stream_exists = [this](const String & stream_name) + std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension(); + auto check_stream_exists = [this, &marks_file_extension](const String & stream_name) { auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION); - auto mrk_checksum = checksums.files.find(stream_name + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension); return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end(); }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 14ff073a511..0bccc21e478 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -431,7 +431,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt); std::unique_ptr mrk_in; - if (data_part->index_granularity_info.compress_marks) + if (data_part->index_granularity_info.mark_type.compressed) mrk_in = std::make_unique(std::move(mrk_file_in)); else mrk_in = std::move(mrk_file_in); diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 39d3f2d7d82..98a72eb13c0 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -9,10 +9,87 @@ namespace DB namespace ErrorCodes { + extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; extern const int UNKNOWN_PART_TYPE; + extern const int INCORRECT_FILE_NAME; } + +MarkType::MarkType(std::string_view extension) +{ + if (extension.starts_with('.')) + extension = extension.substr(1); + + if (extension.starts_with('c')) + { + compressed = true; + extension = extension.substr(1); + } + + if (!extension.starts_with("mrk")) + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); + + extension = extension.substr(strlen("mrk")); + + if (extension.empty()) + { + adaptive = false; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "2") + { + adaptive = true; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "3") + { + adaptive = true; + part_type = MergeTreeDataPartType::Compact; + } + else + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension); +} + +MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_) + : adaptive(adaptive_), compressed(compressed_), part_type(part_type_) +{ + if (!adaptive && part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + if (part_type == MergeTreeDataPartType::Unknown) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); +} + +bool MarkType::isMarkFileExtension(std::string_view extension) +{ + return extension.find("mrk") != std::string_view::npos; +} + +std::string MarkType::getFileExtension() const +{ + std::string res = compressed ? ".cmrk" : ".mrk"; + + if (!adaptive) + { + if (part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + return res; + } + + switch (part_type) + { + case MergeTreeDataPartType::Wide: + return res + "2"; + case MergeTreeDataPartType::Compact: + return res + "3"; + case MergeTreeDataPartType::InMemory: + return ""; + case MergeTreeDataPartType::Unknown: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); + } +} + + std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage) { if (data_part_storage->exists()) @@ -24,51 +101,28 @@ std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromF } MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_) - : type(type_) + : mark_type(storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue()) { - const auto storage_settings = storage.getSettings(); - fixed_index_granularity = storage_settings->index_granularity; - compress_marks = storage_settings->compress_marks; - - /// Granularity is fixed - if (!storage.canUseAdaptiveGranularity()) - { - if (type != MergeTreeDataPartType::Wide) - throw Exception("Only Wide parts can be used with non-adaptive granularity.", ErrorCodes::NOT_IMPLEMENTED); - setNonAdaptive(); - } - else - setAdaptive(storage_settings->index_granularity_bytes); + fixed_index_granularity = storage.getSettings()->index_granularity; } void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage) { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(compress_marks)) - setNonAdaptive(); -} - -void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_) -{ - is_adaptive = true; - marks_file_extension = getAdaptiveMrkExtension(type, compress_marks); - index_granularity_bytes = index_granularity_bytes_; -} - -void MergeTreeIndexGranularityInfo::setNonAdaptive() -{ - is_adaptive = false; - marks_file_extension = getNonAdaptiveMrkExtension(compress_marks); - index_granularity_bytes = 0; + if (mrk_ext && !MarkType(*mrk_ext).adaptive) + { + mark_type.adaptive = false; + index_granularity_bytes = 0; + } } size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const { - if (type == MergeTreeDataPartType::Wide) - return is_adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide(); - else if (type == MergeTreeDataPartType::Compact) + if (mark_type.part_type == MergeTreeDataPartType::Wide) + return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide(); + else if (mark_type.part_type == MergeTreeDataPartType::Compact) return getAdaptiveMrkSizeCompact(columns_num); - else if (type == MergeTreeDataPartType::InMemory) + else if (mark_type.part_type == MergeTreeDataPartType::InMemory) return 0; else throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE); @@ -80,16 +134,4 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num) return sizeof(UInt64) * (columns_num * 2 + 1); } -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks) -{ - if (part_type == MergeTreeDataPartType::Wide) - return compress_marks ? ".cmrk2" : ".mrk2"; - else if (part_type == MergeTreeDataPartType::Compact) - return compress_marks ? ".cmrk3" : ".mrk3"; - else if (part_type == MergeTreeDataPartType::InMemory) - return ""; - else - throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE); -} - } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index 0bfd404c158..a8917a83a1e 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -11,15 +11,30 @@ namespace DB class MergeTreeData; + +/** Various types of mark files are stored in files with various extensions: + * .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3. + * This helper allows to obtain mark type from file extension and vise versa. + */ +struct MarkType +{ + MarkType(std::string_view extension); + MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_); + + static bool isMarkFileExtension(std::string_view extension); + std::string getFileExtension() const; + + bool adaptive = false; + bool compressed = false; + MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown; +}; + + /// Meta information about index granularity struct MergeTreeIndexGranularityInfo { public: - /// Marks file extension '.mrk' or '.mrk2' - String marks_file_extension; - - /// Is stride in rows between marks non fixed? - bool is_adaptive = false; + MarkType mark_type; /// Fixed size in rows of one granule if index_granularity_bytes is zero size_t fixed_index_granularity = 0; @@ -27,38 +42,29 @@ public: /// Approximate bytes size of one granule size_t index_granularity_bytes = 0; - /// Whether to compress marks - bool compress_marks; - MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_); void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage); String getMarksFilePath(const String & path_prefix) const { - return path_prefix + marks_file_extension; + return path_prefix + mark_type.getFileExtension(); } + /// TODO: This is non-passable (method overload), remove before merge. String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - return path_prefix + mrk_ext.value_or(marks_file_extension); + return path_prefix + mrk_ext.value_or(mark_type.getFileExtension()); } size_t getMarkSizeInBytes(size_t columns_num = 1) const; static std::optional getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); - -private: - MergeTreeDataPartType type; - void setAdaptive(size_t index_granularity_bytes_); - void setNonAdaptive(); }; -constexpr inline auto getNonAdaptiveMrkExtension(bool compress_marks) { return compress_marks ? ".cmrk" : ".mrk"; } constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; } constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; } inline size_t getAdaptiveMrkSizeCompact(size_t columns_num); -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks); } diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 3d3532ff8d0..666695f485e 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -80,7 +80,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() else reader = std::make_unique(std::move(buffer)); - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.mark_type.adaptive) { /// Read directly to marks. reader->readStrict(reinterpret_cast(res->data()), expected_uncompressed_size); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index a5bc189e42f..269a78977ad 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -33,7 +33,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( storage.getContext()->getSettings(), write_settings, storage.getSettings(), - data_part->index_granularity_info.is_adaptive, + data_part->index_granularity_info.mark_type.adaptive, /* rewrite_primary_key = */ true, blocks_are_granules_size); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 21e01223e2c..dd75cddd380 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -30,8 +30,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( global_settings, data_part->storage.getContext()->getWriteSettings(), storage_settings, - index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), - /* rewrite_primary_key = */false); + index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(), + /* rewrite_primary_key = */ false); writer = data_part->getWriter( data_part_storage_builder, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index dd54e9a5862..bd132b9c022 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1570,7 +1570,7 @@ bool MutateTask::prepare() ctx->new_data_part->partition.assign(ctx->source_part->partition); /// Don't change granularity type while mutating subset of columns - ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension; + ctx->mrk_extension = ctx->source_part->index_granularity_info.mark_type.getFileExtension(); const auto data_settings = ctx->data->getSettings(); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 87a5afe2439..f18856f0c19 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -261,7 +261,7 @@ void StorageSystemPartsColumns::processNextStorage( size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.marks_file_extension); + auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension()); if (mrk_checksum != part->checksums.files.end()) size.marks += mrk_checksum->second.file_size; From fd2be1f04136aa0728157f2ae2c626e538919458 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 07:31:29 +0200 Subject: [PATCH 4/5] Fix error --- src/Storages/MergeTree/IPartMetadataManager.cpp | 3 ++- src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/IPartMetadataManager.cpp b/src/Storages/MergeTree/IPartMetadataManager.cpp index 8aecb3cd7d4..03fa3e3309e 100644 --- a/src/Storages/MergeTree/IPartMetadataManager.cpp +++ b/src/Storages/MergeTree/IPartMetadataManager.cpp @@ -14,7 +14,8 @@ IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : p bool IPartMetadataManager::isCompressedFromFileName(const String & file_name) { std::string extension = fs::path(file_name).extension(); - return MarkType(extension).compressed || isCompressedFromIndexExtension(extension); + return (MarkType::isMarkFileExtension(extension) && MarkType(extension).compressed) + || isCompressedFromIndexExtension(extension); } } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 98a72eb13c0..56d0478d46c 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -28,7 +28,7 @@ MarkType::MarkType(std::string_view extension) } if (!extension.starts_with("mrk")) - throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk: {}", extension); extension = extension.substr(strlen("mrk")); From ad9ae84e76db2754bfaa7e3d513c72704f68cee8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 07:34:44 +0200 Subject: [PATCH 5/5] Fix error --- src/Storages/MergeTree/IMergeTreeDataPart.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index a0f20a4f09d..810bd334273 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include