From d7127e4b2db6def090931a3cf9dc59cd9d90a9ab Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 5 Sep 2022 07:26:58 +0200 Subject: [PATCH] Make it slightly more sane --- .../QueryPlan/ReadFromMergeTree.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 73 +--------- src/Storages/MergeTree/IMergeTreeDataPart.h | 20 +-- .../MergeTree/IMergedBlockOutputStream.cpp | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 6 +- .../MergeTree/MergeTreeDataPartCompact.cpp | 12 +- .../MergeTree/MergeTreeDataPartWide.cpp | 70 +++++---- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 2 +- .../MergeTreeIndexGranularityInfo.cpp | 134 ++++++++++++------ .../MergeTree/MergeTreeIndexGranularityInfo.h | 40 +++--- .../MergeTree/MergeTreeMarksLoader.cpp | 2 +- .../MergeTree/MergedBlockOutputStream.cpp | 2 +- .../MergedColumnOnlyOutputStream.cpp | 4 +- src/Storages/MergeTree/MutateTask.cpp | 2 +- .../System/StorageSystemPartsColumns.cpp | 2 +- 15 files changed, 162 insertions(+), 210 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 1f6c6ee2a3f..a6100fe42c4 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -341,7 +341,7 @@ struct PartRangesReadInfo sum_marks_in_parts[i] = parts[i].getMarksCount(); sum_marks += sum_marks_in_parts[i]; - if (parts[i].data_part->index_granularity_info.is_adaptive) + if (parts[i].data_part->index_granularity_info.mark_type.adaptive) ++adaptive_parts; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index fb1f2672cac..d61b23c67a2 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -65,7 +65,6 @@ namespace ErrorCodes extern const int BAD_TTL_FILE; extern const int NOT_IMPLEMENTED; extern const int NO_SUCH_COLUMN_IN_TABLE; - extern const int INCORRECT_FILE_NAME; } @@ -1629,7 +1628,7 @@ void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk() auto index_name_escaped = escapeForFileName(index_name); auto index_file_name = index_name_escaped + index_ptr->getSerializedFileExtension(); - auto index_marks_file_name = index_name_escaped + index_granularity_info.marks_file_extension; + auto index_marks_file_name = index_name_escaped + getMarksFileExtension(); /// If part does not contain index auto bin_checksum = checksums.files.find(index_file_name); @@ -1809,74 +1808,4 @@ bool isCompressedFromIndexExtension(const String & index_extension) return index_extension == getIndexExtension(true); } -MarkType::MarkType(std::string_view extension) -{ - if (extension.starts_with('c')) - { - compressed = true; - extension = extension.substr(1); - } - - if (!extension.starts_with("mrk")) - throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); - - extension = extension.substr(strlen("mrk")); - - if (extension.empty()) - { - adaptive = false; - part_type = MergeTreeDataPartType::Wide; - } - else if (extension == "2") - { - adaptive = true; - part_type = MergeTreeDataPartType::Wide; - } - else if (extension == "3") - { - adaptive = true; - part_type = MergeTreeDataPartType::Compact; - } - else - throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension); -} - -MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_) - : adaptive(adaptive_), compressed(compressed_), part_type(part_type_) -{ - if (!adaptive && part_type != MergeTreeDataPartType::Wide) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); - if (part_type == MergeTreeDataPartType::Unknown) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); -} - -bool MarkType::isMarkFileExtension(std::string_view extension) -{ - return extension.find("mrk") != std::string_view::npos; -} - -std::string MarkType::getFileExtension() -{ - std::string res = compressed ? "cmrk" : "mrk"; - - if (!adaptive) - { - if (part_type != MergeTreeDataPartType::Wide) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); - return res; - } - - switch (part_type) - { - case MergeTreeDataPartType::Wide: - return res + "2"; - case MergeTreeDataPartType::Compact: - return res + "3"; - case MergeTreeDataPartType::InMemory: - return ""; - case MergeTreeDataPartType::Unknown: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); - } -} - } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index c5befb76def..a0f20a4f09d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -160,7 +160,7 @@ public: void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency); void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const; - String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; } + String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); } /// Generate the new name for this part according to `new_part_info` and min/max dates from the old name. /// This is useful when you want to change e.g. block numbers or the mutation version of the part. @@ -588,22 +588,4 @@ inline String getIndexExtension(bool is_compressed_primary_key) { return is_comp std::optional getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); bool isCompressedFromIndexExtension(const String & index_extension); - -/** Various types of mark files are stored in files with various extensions: - * .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3. - * This helper allows to obtain mark type from file extension and vise versa. - */ -struct MarkType -{ - MarkType(std::string_view extension); - MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_); - - static bool isMarkFileExtension(std::string_view extension); - std::string getFileExtension(); - - bool adaptive = false; - bool compressed = false; - MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown; -}; - } diff --git a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp index 9be49a9bba4..5af9bbd3ed8 100644 --- a/src/Storages/MergeTree/IMergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/IMergedBlockOutputStream.cpp @@ -106,6 +106,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart( if (remove_it != columns.end()) columns.erase(remove_it); } + return remove_files; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c166353b486..26241922512 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1072,7 +1072,7 @@ void MergeTreeData::loadDataPartsFromDisk( suspicious_broken_parts_bytes += size_of_part; return; } - if (!part->index_granularity_info.is_adaptive) + if (!part->index_granularity_info.mark_type.adaptive) has_non_adaptive_parts.store(true, std::memory_order_relaxed); else has_adaptive_parts.store(true, std::memory_order_relaxed); @@ -6315,9 +6315,9 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const if (!settings->enable_mixed_granularity_parts || settings->index_granularity_bytes == 0) { - if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.is_adaptive) + if (!canUseAdaptiveGranularity() && src_part->index_granularity_info.mark_type.adaptive) return false; - if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.is_adaptive) + if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive) return false; } return true; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 046a7d274c0..20f47d97d61 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -71,7 +71,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter( return std::make_unique( shared_from_this(), std::move(data_part_storage_builder), ordered_columns_list, metadata_snapshot, - indices_to_recalc, index_granularity_info.marks_file_extension, + indices_to_recalc, getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } @@ -85,19 +85,17 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac total_size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) total_size.marks += mrk_checksum->second.file_size; } void MergeTreeDataPartCompact::loadIndexGranularity() { - //String full_path = getRelativePath(); - if (columns.empty()) throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.mark_type.adaptive) throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED); auto marks_file_path = index_granularity_info.getMarksFilePath("data"); @@ -131,7 +129,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co return false; auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION); - auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + getMarksFileExtension()); return (bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end()); } @@ -139,7 +137,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) co void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata) const { checkConsistencyBase(); - String mrk_file_name = DATA_FILE_NAME + index_granularity_info.marks_file_extension; + String mrk_file_name = DATA_FILE_NAME + getMarksFileExtension(); if (!checksums.empty()) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index c3ad0f0e7d1..e71059150f9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -67,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter( return std::make_unique( shared_from_this(), data_part_storage_builder, columns_list, metadata_snapshot, indices_to_recalc, - index_granularity_info.marks_file_extension, + getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } @@ -95,7 +95,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(file_name + getMarksFileExtension()); if (mrk_checksum != checksums.files.end()) size.marks += mrk_checksum->second.file_size; }); @@ -103,11 +103,11 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl( return size; } + void MergeTreeDataPartWide::loadIndexGranularity() { index_granularity_info.changeGranularityIfRequired(data_part_storage); - if (columns.empty()) throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); @@ -119,55 +119,48 @@ void MergeTreeDataPartWide::loadIndexGranularity() std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path)); size_t marks_file_size = data_part_storage->getFileSize(marks_file_path); - if (!index_granularity_info.compress_marks) - { - if (!index_granularity_info.is_adaptive) - { - size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); - index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same - } - else - { - auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); - while (!buffer->eof()) - { - buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block - size_t granularity; - readIntBinary(granularity, *buffer); - index_granularity.appendMark(granularity); - } - if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size) - throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA); - } + if (!index_granularity_info.mark_type.adaptive && !index_granularity_info.mark_type.compressed) + { + /// The most easy way - no need to read the file, everything is known from its size. + size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same } else { - CompressedReadBufferFromFile buffer( - data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt)); + auto marks_file = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); + + std::unique_ptr marks_reader; + if (!index_granularity_info.mark_type.compressed) + marks_reader = std::move(marks_file); + else + marks_reader = std::make_unique(std::move(marks_file)); - MarksInCompressedFile mark(1); size_t marks_count = 0; - while (!buffer.eof()) + while (!marks_reader->eof()) { - buffer.readStrict(reinterpret_cast(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block + MarkInCompressedFile mark; + size_t granularity; + + readBinary(mark.offset_in_compressed_file, *marks_reader); + readBinary(mark.offset_in_decompressed_block, *marks_reader); ++marks_count; - if (index_granularity_info.is_adaptive) + if (index_granularity_info.mark_type.adaptive) { - size_t granularity; - readIntBinary(granularity, buffer); + readIntBinary(granularity, *marks_reader); index_granularity.appendMark(granularity); } } - if (!index_granularity_info.is_adaptive) - index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); + if (!index_granularity_info.mark_type.adaptive) + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same } index_granularity.setInitialized(); } + bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const { return data_part_storage->isStoredOnRemoteDisk(); @@ -186,7 +179,7 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide() void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const { checkConsistencyBase(); - //String path = getRelativePath(); + std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension(); if (!checksums.empty()) { @@ -197,7 +190,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { String file_name = ISerialization::getFileNameForStream(name_type, substream_path); - String mrk_file_name = file_name + index_granularity_info.marks_file_extension; + String mrk_file_name = file_name + marks_file_extension; String bin_file_name = file_name + DATA_FILE_EXTENSION; if (!checksums.files.contains(mrk_file_name)) @@ -223,7 +216,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const { getSerialization(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension; + auto file_path = ISerialization::getFileNameForStream(name_type, substream_path) + marks_file_extension; /// Missing file is Ok for case when new column was added. if (data_part_storage->exists(file_path)) @@ -251,10 +244,11 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const { - auto check_stream_exists = [this](const String & stream_name) + std::string marks_file_extension = index_granularity_info.mark_type.getFileExtension(); + auto check_stream_exists = [this, &marks_file_extension](const String & stream_name) { auto bin_checksum = checksums.files.find(stream_name + DATA_FILE_EXTENSION); - auto mrk_checksum = checksums.files.find(stream_name + index_granularity_info.marks_file_extension); + auto mrk_checksum = checksums.files.find(stream_name + marks_file_extension); return bin_checksum != checksums.files.end() && mrk_checksum != checksums.files.end(); }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 14ff073a511..0bccc21e478 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -431,7 +431,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt); std::unique_ptr mrk_in; - if (data_part->index_granularity_info.compress_marks) + if (data_part->index_granularity_info.mark_type.compressed) mrk_in = std::make_unique(std::move(mrk_file_in)); else mrk_in = std::move(mrk_file_in); diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 39d3f2d7d82..98a72eb13c0 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -9,10 +9,87 @@ namespace DB namespace ErrorCodes { + extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; extern const int UNKNOWN_PART_TYPE; + extern const int INCORRECT_FILE_NAME; } + +MarkType::MarkType(std::string_view extension) +{ + if (extension.starts_with('.')) + extension = extension.substr(1); + + if (extension.starts_with('c')) + { + compressed = true; + extension = extension.substr(1); + } + + if (!extension.starts_with("mrk")) + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Mark file extension does not start with .mrk or .cmrk"); + + extension = extension.substr(strlen("mrk")); + + if (extension.empty()) + { + adaptive = false; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "2") + { + adaptive = true; + part_type = MergeTreeDataPartType::Wide; + } + else if (extension == "3") + { + adaptive = true; + part_type = MergeTreeDataPartType::Compact; + } + else + throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Unknown mark file extension: '{}'", extension); +} + +MarkType::MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_) + : adaptive(adaptive_), compressed(compressed_), part_type(part_type_) +{ + if (!adaptive && part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + if (part_type == MergeTreeDataPartType::Unknown) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); +} + +bool MarkType::isMarkFileExtension(std::string_view extension) +{ + return extension.find("mrk") != std::string_view::npos; +} + +std::string MarkType::getFileExtension() const +{ + std::string res = compressed ? ".cmrk" : ".mrk"; + + if (!adaptive) + { + if (part_type != MergeTreeDataPartType::Wide) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: non-Wide data part type with non-adaptive granularity"); + return res; + } + + switch (part_type) + { + case MergeTreeDataPartType::Wide: + return res + "2"; + case MergeTreeDataPartType::Compact: + return res + "3"; + case MergeTreeDataPartType::InMemory: + return ""; + case MergeTreeDataPartType::Unknown: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown data part type"); + } +} + + std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage) { if (data_part_storage->exists()) @@ -24,51 +101,28 @@ std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromF } MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_) - : type(type_) + : mark_type(storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue()) { - const auto storage_settings = storage.getSettings(); - fixed_index_granularity = storage_settings->index_granularity; - compress_marks = storage_settings->compress_marks; - - /// Granularity is fixed - if (!storage.canUseAdaptiveGranularity()) - { - if (type != MergeTreeDataPartType::Wide) - throw Exception("Only Wide parts can be used with non-adaptive granularity.", ErrorCodes::NOT_IMPLEMENTED); - setNonAdaptive(); - } - else - setAdaptive(storage_settings->index_granularity_bytes); + fixed_index_granularity = storage.getSettings()->index_granularity; } void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage) { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(compress_marks)) - setNonAdaptive(); -} - -void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_) -{ - is_adaptive = true; - marks_file_extension = getAdaptiveMrkExtension(type, compress_marks); - index_granularity_bytes = index_granularity_bytes_; -} - -void MergeTreeIndexGranularityInfo::setNonAdaptive() -{ - is_adaptive = false; - marks_file_extension = getNonAdaptiveMrkExtension(compress_marks); - index_granularity_bytes = 0; + if (mrk_ext && !MarkType(*mrk_ext).adaptive) + { + mark_type.adaptive = false; + index_granularity_bytes = 0; + } } size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) const { - if (type == MergeTreeDataPartType::Wide) - return is_adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide(); - else if (type == MergeTreeDataPartType::Compact) + if (mark_type.part_type == MergeTreeDataPartType::Wide) + return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide(); + else if (mark_type.part_type == MergeTreeDataPartType::Compact) return getAdaptiveMrkSizeCompact(columns_num); - else if (type == MergeTreeDataPartType::InMemory) + else if (mark_type.part_type == MergeTreeDataPartType::InMemory) return 0; else throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE); @@ -80,16 +134,4 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num) return sizeof(UInt64) * (columns_num * 2 + 1); } -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks) -{ - if (part_type == MergeTreeDataPartType::Wide) - return compress_marks ? ".cmrk2" : ".mrk2"; - else if (part_type == MergeTreeDataPartType::Compact) - return compress_marks ? ".cmrk3" : ".mrk3"; - else if (part_type == MergeTreeDataPartType::InMemory) - return ""; - else - throw Exception("Unknown part type", ErrorCodes::UNKNOWN_PART_TYPE); -} - } diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index 0bfd404c158..a8917a83a1e 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -11,15 +11,30 @@ namespace DB class MergeTreeData; + +/** Various types of mark files are stored in files with various extensions: + * .mrk, .mrk2, .mrk3, .cmrk, .cmrk2, .cmrk3. + * This helper allows to obtain mark type from file extension and vise versa. + */ +struct MarkType +{ + MarkType(std::string_view extension); + MarkType(bool adaptive_, bool compressed_, MergeTreeDataPartType::Value part_type_); + + static bool isMarkFileExtension(std::string_view extension); + std::string getFileExtension() const; + + bool adaptive = false; + bool compressed = false; + MergeTreeDataPartType::Value part_type = MergeTreeDataPartType::Unknown; +}; + + /// Meta information about index granularity struct MergeTreeIndexGranularityInfo { public: - /// Marks file extension '.mrk' or '.mrk2' - String marks_file_extension; - - /// Is stride in rows between marks non fixed? - bool is_adaptive = false; + MarkType mark_type; /// Fixed size in rows of one granule if index_granularity_bytes is zero size_t fixed_index_granularity = 0; @@ -27,38 +42,29 @@ public: /// Approximate bytes size of one granule size_t index_granularity_bytes = 0; - /// Whether to compress marks - bool compress_marks; - MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_); void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage); String getMarksFilePath(const String & path_prefix) const { - return path_prefix + marks_file_extension; + return path_prefix + mark_type.getFileExtension(); } + /// TODO: This is non-passable (method overload), remove before merge. String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - return path_prefix + mrk_ext.value_or(marks_file_extension); + return path_prefix + mrk_ext.value_or(mark_type.getFileExtension()); } size_t getMarkSizeInBytes(size_t columns_num = 1) const; static std::optional getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); - -private: - MergeTreeDataPartType type; - void setAdaptive(size_t index_granularity_bytes_); - void setNonAdaptive(); }; -constexpr inline auto getNonAdaptiveMrkExtension(bool compress_marks) { return compress_marks ? ".cmrk" : ".mrk"; } constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; } constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; } inline size_t getAdaptiveMrkSizeCompact(size_t columns_num); -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool compress_marks); } diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 3d3532ff8d0..666695f485e 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -80,7 +80,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() else reader = std::make_unique(std::move(buffer)); - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.mark_type.adaptive) { /// Read directly to marks. reader->readStrict(reinterpret_cast(res->data()), expected_uncompressed_size); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index a5bc189e42f..269a78977ad 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -33,7 +33,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( storage.getContext()->getSettings(), write_settings, storage.getSettings(), - data_part->index_granularity_info.is_adaptive, + data_part->index_granularity_info.mark_type.adaptive, /* rewrite_primary_key = */ true, blocks_are_granules_size); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 21e01223e2c..dd75cddd380 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -30,8 +30,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( global_settings, data_part->storage.getContext()->getWriteSettings(), storage_settings, - index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(), - /* rewrite_primary_key = */false); + index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(), + /* rewrite_primary_key = */ false); writer = data_part->getWriter( data_part_storage_builder, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index dd54e9a5862..bd132b9c022 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1570,7 +1570,7 @@ bool MutateTask::prepare() ctx->new_data_part->partition.assign(ctx->source_part->partition); /// Don't change granularity type while mutating subset of columns - ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension; + ctx->mrk_extension = ctx->source_part->index_granularity_info.mark_type.getFileExtension(); const auto data_settings = ctx->data->getSettings(); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 87a5afe2439..f18856f0c19 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -261,7 +261,7 @@ void StorageSystemPartsColumns::processNextStorage( size.data_uncompressed += bin_checksum->second.uncompressed_size; } - auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.marks_file_extension); + auto mrk_checksum = part->checksums.files.find(file_name + part->index_granularity_info.mark_type.getFileExtension()); if (mrk_checksum != part->checksums.files.end()) size.marks += mrk_checksum->second.file_size;