diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 188a159c4b4..6b8d7b38451 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -740,7 +740,7 @@ void IMergeTreeDataPart::loadIndex() loaded_index[i]->reserve(index_granularity.getMarksCount()); } - String index_name = "primary.idx"; + String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value(); String index_path = fs::path(data_part_storage->getRelativePath()) / index_name; auto index_file = metadata_manager->read(index_name); size_t marks_count = index_granularity.getMarksCount(); @@ -779,7 +779,10 @@ void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const return; if (metadata_snapshot->hasPrimaryKey()) - files.push_back("primary.idx"); + { + String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value(); + files.push_back(index_name); + } } NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const @@ -1527,8 +1530,11 @@ void IMergeTreeDataPart::checkConsistencyBase() const const auto & partition_key = metadata_snapshot->getPartitionKey(); if (!checksums.empty()) { - if (!pk.column_names.empty() && !checksums.files.contains("primary.idx")) - throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART); + if (!pk.column_names.empty() + && (!checksums.files.contains("primary" + getIndexExtension(false)) + && !checksums.files.contains("primary" + getIndexExtension(true)))) + throw Exception("No checksum for " + toString("primary" + getIndexExtension(false)) + " or " + toString("primary" + getIndexExtension(true)), + ErrorCodes::NO_FILE_IN_DATA_PART); if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { @@ -1566,7 +1572,10 @@ void IMergeTreeDataPart::checkConsistencyBase() const /// Check that the primary key index is not empty. if (!pk.column_names.empty()) - check_file_not_empty("primary.idx"); + { + String index_name = "primary" + getIndexExtensionFromFilesystem(data_part_storage).value(); + check_file_not_empty(index_name); + } if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { @@ -1779,4 +1788,32 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part) return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory); } +std::optional getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage) +{ + if (data_part_storage->exists()) + { + for (auto it = data_part_storage->iterate(); it->isValid(); it->next()) + { + const auto & extension = fs::path(it->name()).extension(); + if (extension == getIndexExtension(false) + || extension == getIndexExtension(true)) + return extension; + } + } + return {".idx"}; +} + +bool isCompressFromIndexExtension(const String & index_extension) +{ + return index_extension == getIndexExtension(true); +} + +bool isCompressFromMrkExtension(const String & mrk_extension) +{ + return mrk_extension == getNonAdaptiveMrkExtension(true) + || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) + || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true) + || mrk_extension == getAdaptiveMrkExtension(MergeTreeDataPartType::InMemory, true); +} + } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 2a5e95e23d1..f3ddc116b15 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -605,5 +605,9 @@ using MergeTreeMutableDataPartPtr = std::shared_ptr; bool isCompactPart(const MergeTreeDataPartPtr & data_part); bool isWidePart(const MergeTreeDataPartPtr & data_part); bool isInMemoryPart(const MergeTreeDataPartPtr & data_part); +inline String getIndexExtension(bool is_compress_primary_key) { return is_compress_primary_key ? ".cidx" : ".idx"; } +std::optional getIndexExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage); +bool isCompressFromIndexExtension(const String & index_extension); +bool isCompressFromMrkExtension(const String & mrk_extension); } diff --git a/src/Storages/MergeTree/IPartMetadataManager.cpp b/src/Storages/MergeTree/IPartMetadataManager.cpp index d09fc9d4244..448dd7433a4 100644 --- a/src/Storages/MergeTree/IPartMetadataManager.cpp +++ b/src/Storages/MergeTree/IPartMetadataManager.cpp @@ -8,4 +8,14 @@ namespace DB IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : part(part_) { } + +bool IPartMetadataManager::isCompressFromFileName(const String & file_name) const +{ + const auto & extension = fs::path(file_name).extension(); + if (isCompressFromMrkExtension(extension) + || isCompressFromIndexExtension(extension)) + return true; + + return false; +} } diff --git a/src/Storages/MergeTree/IPartMetadataManager.h b/src/Storages/MergeTree/IPartMetadataManager.h index c1bf3b15805..265637008ce 100644 --- a/src/Storages/MergeTree/IPartMetadataManager.h +++ b/src/Storages/MergeTree/IPartMetadataManager.h @@ -9,6 +9,7 @@ namespace DB class IMergeTreeDataPart; +class ReadBuffer; class SeekableReadBuffer; class IDisk; @@ -29,8 +30,8 @@ public: virtual ~IPartMetadataManager() = default; - /// Read metadata content and return SeekableReadBuffer object. - virtual std::unique_ptr read(const String & file_name) const = 0; + /// Read metadata content and return ReadBuffer object. + virtual std::unique_ptr read(const String & file_name) const = 0; /// Return true if metadata exists in part. virtual bool exists(const String & file_name) const = 0; @@ -50,6 +51,9 @@ public: /// Check all metadatas in part. virtual std::unordered_map check() const = 0; + /// Determine whether to compress by file extension + bool isCompressFromFileName(const String & file_name) const; + protected: const IMergeTreeDataPart * part; }; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 07f73759014..ad43a339d82 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2601,11 +2601,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext) { - if (mrk_ext == getNonAdaptiveMrkExtension()) + if (mrk_ext == getNonAdaptiveMrkExtension(true) + || mrk_ext == getNonAdaptiveMrkExtension(false)) return MergeTreeDataPartType::Wide; - if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide)) + if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) + || mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false)) return MergeTreeDataPartType::Wide; - if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact)) + if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true) + || mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false)) return MergeTreeDataPartType::Compact; throw Exception("Can't determine part type, because of unknown mark extension " + mrk_ext, ErrorCodes::UNKNOWN_PART_TYPE); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index c7b6ff0c4dd..e80e8534007 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -119,27 +119,50 @@ void MergeTreeDataPartWide::loadIndexGranularity() std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path)); size_t marks_file_size = data_part_storage->getFileSize(marks_file_path); - - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.is_compress_marks) { - size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); - index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same + if (!index_granularity_info.is_adaptive) + { + size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes(); + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same + } + else + { + auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); + while (!buffer->eof()) + { + buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block + size_t granularity; + readIntBinary(granularity, *buffer); + index_granularity.appendMark(granularity); + } + + if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size) + throw Exception("Cannot read all marks from file " + data_part_storage->getFullPath() + "/" + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA); + } } else { - auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt); - while (!buffer->eof()) + CompressedReadBufferFromFile buffer( + data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt)); + + MarksInCompressedFile mark(1); + size_t marks_count = 0; + while (!buffer.eof()) { - buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block - size_t granularity; - readIntBinary(granularity, *buffer); - index_granularity.appendMark(granularity); + buffer.readStrict(reinterpret_cast(mark.data()), sizeof(size_t) * 2); /// skip offset_in_compressed file and offset_in_decompressed_block + ++marks_count; + + if (index_granularity_info.is_adaptive) + { + size_t granularity; + readIntBinary(granularity, buffer); + index_granularity.appendMark(granularity); + } } - if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size) - throw Exception( - ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", - std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path)); + if (!index_granularity_info.is_adaptive) + index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); } index_granularity.setInitialized(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 56ebadc082c..442ca10c22c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -16,7 +16,11 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() compressed.next(); /// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually. plain_hashing.next(); - marks.next(); + + if (is_compress_marks) + marks_compressed.next(); + + marks_hashing.next(); plain_file->preFinalize(); marks_file->preFinalize(); @@ -48,6 +52,8 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( const std::string & marks_file_extension_, const CompressionCodecPtr & compression_codec_, size_t max_compress_block_size_, + const CompressionCodecPtr & marks_compression_codec_, + size_t marks_compress_block_size_, const WriteSettings & query_write_settings) : escaped_column_name(escaped_column_name_), data_file_extension{data_file_extension_}, @@ -56,7 +62,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_), compressed(compressed_buf), - marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks(*marks_file) + marks_file(data_part_storage_builder->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), + marks_hashing(*marks_file), + marks_compressed_buf(marks_hashing, marks_compression_codec_, marks_compress_block_size_), + marks_compressed(marks_compressed_buf), + is_compress_marks(isCompressFromMrkExtension(marks_file_extension)) { } @@ -70,8 +80,14 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); - checksums.files[name + marks_file_extension].file_size = marks.count(); - checksums.files[name + marks_file_extension].file_hash = marks.getHash(); + if (is_compress_marks) + { + checksums.files[name + marks_file_extension].is_compressed = true; + checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed.count(); + checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed.getHash(); + } + checksums.files[name + marks_file_extension].file_size = marks_hashing.count(); + checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash(); } @@ -91,6 +107,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( , marks_file_extension(marks_file_extension_) , default_codec(default_codec_) , compute_granularity(index_granularity.empty()) + , is_compress_primary_key(settings.is_compress_primary_key) { if (settings.blocks_are_granules_size && !index_granularity.empty()) throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); @@ -156,13 +173,27 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex() { if (metadata_snapshot->hasPrimaryKey()) { - index_file_stream = data_part_storage_builder->writeFile("primary.idx", DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings); - index_stream = std::make_unique(*index_file_stream); + String index_name = "primary" + getIndexExtension(is_compress_primary_key); + index_file_stream = data_part_storage_builder->writeFile(index_name, DBMS_DEFAULT_BUFFER_SIZE, settings.query_write_settings); + index_hashing_stream = std::make_unique(*index_file_stream); + + if (is_compress_primary_key) + { + ParserCodec codec_parser; + auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.primary_key_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + CompressionCodecPtr primary_key_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr); + index_compressed_buf = std::make_unique(*index_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size); + index_compressed_stream = std::make_unique(*index_compressed_buf); + } } } void MergeTreeDataPartWriterOnDisk::initSkipIndices() { + ParserCodec codec_parser; + auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr); + for (const auto & index_helper : skip_indices) { String stream_name = index_helper->getFileName(); @@ -172,7 +203,9 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices() data_part_storage_builder, stream_name, index_helper->getSerializedFileExtension(), stream_name, marks_file_extension, - default_codec, settings.max_compress_block_size, settings.query_write_settings)); + default_codec, settings.max_compress_block_size, + marks_compression_codec, settings.marks_compress_block_size, + settings.query_write_settings)); skip_indices_aggregators.push_back(index_helper->createIndexAggregator()); skip_index_accumulated_marks.push_back(0); } @@ -208,7 +241,8 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc { const auto & primary_column = primary_index_block.getByPosition(j); index_columns[j]->insertFrom(*primary_column.column, granule.start_row); - primary_column.type->getDefaultSerialization()->serializeBinary(*primary_column.column, granule.start_row, *index_stream); + primary_column.type->getDefaultSerialization()->serializeBinary( + *primary_column.column, granule.start_row, is_compress_primary_key ? *index_compressed_stream : *index_hashing_stream); } } } @@ -241,12 +275,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block if (stream.compressed.offset() >= settings.min_compress_block_size) stream.compressed.next(); - writeIntBinary(stream.plain_hashing.count(), stream.marks); - writeIntBinary(stream.compressed.offset(), stream.marks); + writeIntBinary(stream.plain_hashing.count(), stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream.compressed.offset(), stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); /// Actually this numbers is redundant, but we have to store them /// to be compatible with normal .mrk2 file format if (settings.can_use_adaptive_granularity) - writeIntBinary(1UL, stream.marks); + writeIntBinary(1UL, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); } size_t pos = granule.start_row; @@ -263,7 +297,7 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat if (write_final_mark && compute_granularity) index_granularity.appendMark(0); - if (index_stream) + if (index_hashing_stream) { if (write_final_mark) { @@ -272,26 +306,44 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat const auto & column = *last_block_index_columns[j]; size_t last_row_number = column.size() - 1; index_columns[j]->insertFrom(column, last_row_number); - index_types[j]->getDefaultSerialization()->serializeBinary(column, last_row_number, *index_stream); + index_types[j]->getDefaultSerialization()->serializeBinary( + column, last_row_number, is_compress_primary_key ? *index_compressed_stream : *index_hashing_stream); } last_block_index_columns.clear(); } - index_stream->next(); - checksums.files["primary.idx"].file_size = index_stream->count(); - checksums.files["primary.idx"].file_hash = index_stream->getHash(); + if (is_compress_primary_key) + index_compressed_stream->next(); + + index_hashing_stream->next(); + + String index_name = "primary" + getIndexExtension(is_compress_primary_key); + if (is_compress_primary_key) + { + checksums.files[index_name].is_compressed = true; + checksums.files[index_name].uncompressed_size = index_compressed_stream->count(); + checksums.files[index_name].uncompressed_hash = index_compressed_stream->getHash(); + } + checksums.files[index_name].file_size = index_hashing_stream->count(); + checksums.files[index_name].file_hash = index_hashing_stream->getHash(); index_file_stream->preFinalize(); } } void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync) { - if (index_stream) + if (index_hashing_stream) { index_file_stream->finalize(); if (sync) index_file_stream->sync(); - index_stream = nullptr; + + if (is_compress_primary_key) + { + index_compressed_stream = nullptr; + index_compressed_buf = nullptr; + } + index_hashing_stream = nullptr; } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 7cc53db8066..557893f320f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -8,7 +8,8 @@ #include #include #include - +#include +#include namespace DB { @@ -56,6 +57,8 @@ public: const std::string & marks_file_extension_, const CompressionCodecPtr & compression_codec_, size_t max_compress_block_size_, + const CompressionCodecPtr & marks_compression_codec_, + size_t marks_compress_block_size_, const WriteSettings & query_write_settings); String escaped_column_name; @@ -68,9 +71,12 @@ public: CompressedWriteBuffer compressed_buf; HashingWriteBuffer compressed; - /// marks -> marks_file + /// marks -> marks_file -> marks_compressed_buf -> marks_compressed std::unique_ptr marks_file; - HashingWriteBuffer marks; + HashingWriteBuffer marks_hashing; + CompressedWriteBuffer marks_compressed_buf; + HashingWriteBuffer marks_compressed; + bool is_compress_marks; bool is_prefinalized = false; @@ -139,7 +145,11 @@ protected: std::vector skip_index_accumulated_marks; std::unique_ptr index_file_stream; - std::unique_ptr index_stream; + std::unique_ptr index_hashing_stream; + std::unique_ptr index_compressed_buf; + std::unique_ptr index_compressed_stream; + bool is_compress_primary_key; + DataTypes index_types; /// Index columns from the last block /// It's written to index file in the `writeSuffixAndFinalizePart` method diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 3d4aa0a7707..8df89548897 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -110,6 +110,10 @@ void MergeTreeDataPartWriterWide::addStreams( else /// otherwise return only generic codecs and don't use info about the` data_type compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true); + ParserCodec codec_parser; + auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr); + column_streams[stream_name] = std::make_unique( stream_name, data_part_storage_builder, @@ -117,6 +121,8 @@ void MergeTreeDataPartWriterWide::addStreams( stream_name, marks_file_extension, compression_codec, settings.max_compress_block_size, + marks_compression_codec, + settings.marks_compress_block_size, settings.query_write_settings); }; @@ -266,10 +272,10 @@ void MergeTreeDataPartWriterWide::writeSingleMark( void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) { Stream & stream = *column_streams[stream_with_mark.stream_name]; - writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks); - writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks); + writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); + writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); if (settings.can_use_adaptive_granularity) - writeIntBinary(rows_in_mark, stream.marks); + writeIntBinary(rows_in_mark, stream.is_compress_marks? stream.marks_compressed : stream.marks_hashing); } StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( @@ -420,7 +426,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai if (!data_part_storage->exists(mrk_path)) return; - auto mrk_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt); + auto mrk_file_in = data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt); + DB::CompressedReadBufferFromFile mrk_compressed_in(data_part_storage->readFile(mrk_path, {}, std::nullopt, std::nullopt)); + ReadBuffer & mrk_in = data_part->index_granularity_info.is_compress_marks? static_cast(mrk_compressed_in) + : *mrk_file_in; DB::CompressedReadBufferFromFile bin_in(data_part_storage->readFile(bin_path, {}, std::nullopt, std::nullopt)); bool must_be_last = false; UInt64 offset_in_compressed_file = 0; @@ -429,15 +438,15 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai size_t mark_num; - for (mark_num = 0; !mrk_in->eof(); ++mark_num) + for (mark_num = 0; !mrk_in.eof(); ++mark_num) { if (mark_num > index_granularity.getMarksCount()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect number of marks in memory {}, on disk (at least) {}", index_granularity.getMarksCount(), mark_num + 1); - DB::readBinary(offset_in_compressed_file, *mrk_in); - DB::readBinary(offset_in_decompressed_block, *mrk_in); + DB::readBinary(offset_in_compressed_file, mrk_in); + DB::readBinary(offset_in_decompressed_block, mrk_in); if (settings.can_use_adaptive_granularity) - DB::readBinary(index_granularity_rows, *mrk_in); + DB::readBinary(index_granularity_rows, mrk_in); else index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity; @@ -446,7 +455,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai if (index_granularity_rows != 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "We ran out of binary data but still have non empty mark #{} with rows number {}", mark_num, index_granularity_rows); - if (!mrk_in->eof()) + if (!mrk_in.eof()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark #{} must be last, but we still have some to read", mark_num); break; @@ -508,7 +517,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai } } - if (!mrk_in->eof()) + if (!mrk_in.eof()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Still have something in marks stream, last mark #{} index granularity size {}, last rows {}", mark_num, index_granularity.getMarksCount(), index_granularity_rows); if (!bin_in.eof()) diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 55848e09434..47e9ff63630 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -43,6 +43,11 @@ struct MergeTreeWriterSettings , max_compress_block_size( storage_settings->max_compress_block_size ? storage_settings->max_compress_block_size : global_settings.max_compress_block_size) + , marks_compression_codec(storage_settings->marks_compression_codec) + , marks_compress_block_size(storage_settings->marks_compress_block_size) + , is_compress_primary_key(storage_settings->compress_primary_key) + , primary_key_compression_codec(storage_settings->primary_key_compression_codec) + , primary_key_compress_block_size(storage_settings->primary_key_compress_block_size) , can_use_adaptive_granularity(can_use_adaptive_granularity_) , rewrite_primary_key(rewrite_primary_key_) , blocks_are_granules_size(blocks_are_granules_size_) @@ -52,6 +57,14 @@ struct MergeTreeWriterSettings size_t min_compress_block_size; size_t max_compress_block_size; + + String marks_compression_codec; + size_t marks_compress_block_size; + + bool is_compress_primary_key; + String primary_key_compression_codec; + size_t primary_key_compress_block_size; + bool can_use_adaptive_granularity; bool rewrite_primary_key; bool blocks_are_granules_size; diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 6ae58dc4584..6b2e073b9bb 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -20,9 +20,12 @@ std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromF for (auto it = data_part_storage->iterate(); it->isValid(); it->next()) { const auto & ext = fs::path(it->name()).extension(); - if (ext == getNonAdaptiveMrkExtension() - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide) - || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact)) + if (ext == getNonAdaptiveMrkExtension(false) + || ext == getNonAdaptiveMrkExtension(true) + || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, false) + || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Wide, true) + || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, false) + || ext == getAdaptiveMrkExtension(MergeTreeDataPartType::Compact, true)) return ext; } } @@ -34,6 +37,7 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData { const auto storage_settings = storage.getSettings(); fixed_index_granularity = storage_settings->index_granularity; + is_compress_marks = storage_settings->compress_marks; /// Granularity is fixed if (!storage.canUseAdaptiveGranularity()) @@ -49,21 +53,21 @@ MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage) { auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage); - if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension()) + if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension(is_compress_marks)) setNonAdaptive(); } void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_) { is_adaptive = true; - marks_file_extension = getAdaptiveMrkExtension(type); + marks_file_extension = getAdaptiveMrkExtension(type, is_compress_marks); index_granularity_bytes = index_granularity_bytes_; } void MergeTreeIndexGranularityInfo::setNonAdaptive() { is_adaptive = false; - marks_file_extension = getNonAdaptiveMrkExtension(); + marks_file_extension = getNonAdaptiveMrkExtension(is_compress_marks); index_granularity_bytes = 0; } @@ -85,12 +89,12 @@ size_t getAdaptiveMrkSizeCompact(size_t columns_num) return sizeof(UInt64) * (columns_num * 2 + 1); } -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type) +std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool is_compress_marks) { if (part_type == MergeTreeDataPartType::Wide) - return ".mrk2"; + return is_compress_marks ? ".cmrk2" : ".mrk2"; else if (part_type == MergeTreeDataPartType::Compact) - return ".mrk3"; + return is_compress_marks ? ".cmrk3" : ".mrk3"; else if (part_type == MergeTreeDataPartType::InMemory) return ""; else diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index dbb027c244e..db59a194bd0 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -27,6 +27,9 @@ public: /// Approximate bytes size of one granule size_t index_granularity_bytes = 0; + /// Whether to compress marks + bool is_compress_marks; + MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_); void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage); @@ -46,10 +49,10 @@ private: void setNonAdaptive(); }; -constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; } +constexpr inline auto getNonAdaptiveMrkExtension(bool is_compress_marks) { return is_compress_marks ? ".cmrk" : ".mrk"; } constexpr inline auto getNonAdaptiveMrkSizeWide() { return sizeof(UInt64) * 2; } constexpr inline auto getAdaptiveMrkSizeWide() { return sizeof(UInt64) * 3; } inline size_t getAdaptiveMrkSizeCompact(size_t columns_num); -std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type); +std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type, bool is_compress_marks); } diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index aeb00bfda79..a27a96e04cd 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include @@ -54,38 +55,68 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); size_t expected_file_size = mark_size * marks_count; - if (expected_file_size != file_size) - throw Exception( - ErrorCodes::CORRUPTED_DATA, - "Bad size of marks file '{}': {}, must be: {}", - std::string(fs::path(data_part_storage->getFullPath()) / mrk_path), - std::to_string(file_size), std::to_string(expected_file_size)); - auto res = std::make_shared(marks_count * columns_in_mark); - if (!index_granularity_info.is_adaptive) + if (!index_granularity_info.is_compress_marks) { - /// Read directly to marks. - auto buffer = data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt); - buffer->readStrict(reinterpret_cast(res->data()), file_size); + if (expected_file_size != file_size) + throw Exception( + ErrorCodes::CORRUPTED_DATA, + "Bad size of marks file '{}': {}, must be: {}", + std::string(fs::path(data_part_storage->getFullPath()) / mrk_path), + std::to_string(file_size), std::to_string(expected_file_size)); - if (!buffer->eof()) - throw Exception("Cannot read all marks from file " + mrk_path + ", eof: " + std::to_string(buffer->eof()) - + ", buffer size: " + std::to_string(buffer->buffer().size()) + ", file size: " + std::to_string(file_size), ErrorCodes::CANNOT_READ_ALL_DATA); + if (!index_granularity_info.is_adaptive) + { + /// Read directly to marks. + auto buffer = data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt); + buffer->readStrict(reinterpret_cast(res->data()), file_size); + + if (!buffer->eof()) + throw Exception("Cannot read all marks from file " + mrk_path + ", eof: " + std::to_string(buffer->eof()) + + ", buffer size: " + std::to_string(buffer->buffer().size()) + ", file size: " + std::to_string(file_size), ErrorCodes::CANNOT_READ_ALL_DATA); + } + else + { + auto buffer = data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt); + size_t i = 0; + while (!buffer->eof()) + { + res->read(*buffer, i * columns_in_mark, columns_in_mark); + buffer->seek(sizeof(size_t), SEEK_CUR); + ++i; + } + + if (i * mark_size != file_size) + throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); + } } else { - auto buffer = data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt); - size_t i = 0; - while (!buffer->eof()) + if (!index_granularity_info.is_adaptive) { - res->read(*buffer, i * columns_in_mark, columns_in_mark); - buffer->seek(sizeof(size_t), SEEK_CUR); - ++i; - } + CompressedReadBufferFromFile buffer(data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt)); + buffer.readStrict(reinterpret_cast(res->data()), expected_file_size); - if (i * mark_size != file_size) - throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); + if (!buffer.eof()) + throw Exception("Cannot read all marks from file " + mrk_path + ", eof: " + std::to_string(buffer.eof()) + + ", buffer size: " + std::to_string(buffer.buffer().size()) + ", file size: " + std::to_string(file_size), ErrorCodes::CANNOT_READ_ALL_DATA); + } + else + { + CompressedReadBufferFromFile buffer(data_part_storage->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt)); + size_t i = 0; + size_t granularity; + while (!buffer.eof()) + { + res->read(buffer, i * columns_in_mark, columns_in_mark); + readIntBinary(granularity, buffer); + ++i; + } + + if (i != marks_count) + throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); + } } res->protect(); return res; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index d8dfa017b7f..73e16f096d5 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -152,7 +152,15 @@ struct Settings; M(UInt64, replicated_max_parallel_sends_for_table, 0, "Obsolete setting, does nothing.", 0) \ M(UInt64, replicated_max_parallel_fetches, 0, "Obsolete setting, does nothing.", 0) \ M(UInt64, replicated_max_parallel_fetches_for_table, 0, "Obsolete setting, does nothing.", 0) \ - M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) + M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) \ + \ + /** compress marks and primary */ \ + M(Bool, compress_marks, false, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \ + M(Bool, compress_primary_key, false, "Primary key support compression, reduce primary key file size and speed up network transmission.", 0) \ + M(String, marks_compression_codec, "ZSTD(3)", "Compression encoding used by marks, marks are small enough and cached, so the default compression is ZSTD(3).", 0) \ + M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key are small enough and cached, so the default compression is ZSTD(3).", 0) \ + M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \ + M(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \ /// Settings that should not change after the creation of a table. /// NOLINTNEXTLINE #define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \ diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index d52948d71c1..83b3a670010 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1559,8 +1559,7 @@ bool MutateTask::prepare() ctx->new_data_part->partition.assign(ctx->source_part->partition); /// Don't change granularity type while mutating subset of columns - ctx->mrk_extension = ctx->source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(ctx->new_data_part->getType()) - : getNonAdaptiveMrkExtension(); + ctx->mrk_extension = ctx->source_part->index_granularity_info.marks_file_extension; const auto data_settings = ctx->data->getSettings(); ctx->need_sync = needSyncPart(ctx->source_part->rows_count, ctx->source_part->getBytesOnDisk(), *data_settings); diff --git a/src/Storages/MergeTree/PartMetadataManagerOrdinary.cpp b/src/Storages/MergeTree/PartMetadataManagerOrdinary.cpp index da147ff1f0e..5821873afbe 100644 --- a/src/Storages/MergeTree/PartMetadataManagerOrdinary.cpp +++ b/src/Storages/MergeTree/PartMetadataManagerOrdinary.cpp @@ -1,6 +1,7 @@ #include "PartMetadataManagerOrdinary.h" #include +#include #include #include @@ -18,9 +19,13 @@ PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPar } -std::unique_ptr PartMetadataManagerOrdinary::read(const String & file_name) const +std::unique_ptr PartMetadataManagerOrdinary::read(const String & file_name) const { - return openForReading(part->data_part_storage, file_name); + if (!isCompressFromFileName(file_name)) + return openForReading(part->data_part_storage, file_name); + + String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name; + return std::make_unique(openForReading(part->data_part_storage, file_path)); } bool PartMetadataManagerOrdinary::exists(const String & file_name) const diff --git a/src/Storages/MergeTree/PartMetadataManagerOrdinary.h b/src/Storages/MergeTree/PartMetadataManagerOrdinary.h index a655431296a..d86d5c54c00 100644 --- a/src/Storages/MergeTree/PartMetadataManagerOrdinary.h +++ b/src/Storages/MergeTree/PartMetadataManagerOrdinary.h @@ -12,7 +12,7 @@ public: ~PartMetadataManagerOrdinary() override = default; - std::unique_ptr read(const String & file_name) const override; + std::unique_ptr read(const String & file_name) const override; bool exists(const String & file_name) const override; diff --git a/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp b/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp index 9930aca2576..2e0893cbd86 100644 --- a/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp +++ b/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include namespace ProfileEvents @@ -38,7 +39,7 @@ String PartMetadataManagerWithCache::getFilePathFromKey(const String & key) cons return key.substr(part->data_part_storage->getDiskName().size() + 1); } -std::unique_ptr PartMetadataManagerWithCache::read(const String & file_name) const +std::unique_ptr PartMetadataManagerWithCache::read(const String & file_name) const { String file_path = fs::path(part->data_part_storage->getRelativePath()) / file_name; String key = getKeyFromFilePath(file_path); @@ -47,8 +48,16 @@ std::unique_ptr PartMetadataManagerWithCache::read(const Str if (!status.ok()) { ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss); - auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt); - readStringUntilEOF(value, *in); + if (!isCompressFromFileName(file_name)) + { + auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt); + readStringUntilEOF(value, *in); + } + else + { + auto in = CompressedReadBuffer(*part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt)); + readStringUntilEOF(value, in); + } cache->put(key, value); } else diff --git a/src/Storages/MergeTree/PartMetadataManagerWithCache.h b/src/Storages/MergeTree/PartMetadataManagerWithCache.h index 06e7a85ba2b..791681ee5bb 100644 --- a/src/Storages/MergeTree/PartMetadataManagerWithCache.h +++ b/src/Storages/MergeTree/PartMetadataManagerWithCache.h @@ -19,7 +19,7 @@ public: ~PartMetadataManagerWithCache() override = default; /// First read the metadata from RocksDB cache, then from disk. - std::unique_ptr read(const String & file_name) const override; + std::unique_ptr read(const String & file_name) const override; /// First judge existence of the metadata in RocksDB cache, then in disk. bool exists(const String & file_name) const override; @@ -48,7 +48,6 @@ private: /// Get cache keys and checksums of corresponding metadata in a part(including projection parts) void getKeysAndCheckSums(Strings & keys, std::vector & checksums) const; - MergeTreeMetadataCachePtr cache; }; diff --git a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference new file mode 100644 index 00000000000..0f7bb82d7f4 --- /dev/null +++ b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference @@ -0,0 +1,4 @@ +test_02381 11904 +test_02381_compress 1658 +1000000 +1000 10000 diff --git a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.sql b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.sql new file mode 100644 index 00000000000..d7c45adf0ef --- /dev/null +++ b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.sql @@ -0,0 +1,17 @@ +drop table if exists test_02381; +create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b); +insert into test_02381 select number, number from system.numbers limit 1000000; + +drop table if exists test_02381_compress; +create table test_02381_compress(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b) + SETTINGS compress_marks=1, compress_primary_key=1, marks_compression_codec='ZSTD(3)', primary_key_compression_codec='ZSTD(3)', marks_compress_block_size=65536, primary_key_compress_block_size=65536; +insert into test_02381_compress select number, number * 10 from system.numbers limit 1000000; + +-- Compare the size of marks on disk +select table,sum(marks_bytes) from system.parts_columns where database = currentDatabase() and table like 'test_02381%' group by table order by table; + +select count(*) from test_02381_compress; +select * from test_02381_compress where a = 1000; + +drop table if exists test_02381; +drop table if exists test_02381_compress;