diff --git a/src/Compression/CompressionCodecMultiple.cpp b/src/Compression/CompressionCodecMultiple.cpp index d762563b758..868df90825e 100644 --- a/src/Compression/CompressionCodecMultiple.cpp +++ b/src/Compression/CompressionCodecMultiple.cpp @@ -106,6 +106,15 @@ void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 sour memcpy(dest, compressed_buf.data(), decompressed_size); } +std::vector CompressionCodecMultiple::getCodecsBytesFromData(const char * source) +{ + std::vector result; + uint8_t compression_methods_size = source[0]; + for (size_t i = 0; i < compression_methods_size; ++i) + result.push_back(source[1 + i]); + return result; +} + bool CompressionCodecMultiple::isCompression() const { for (const auto & codec : codecs) diff --git a/src/Compression/CompressionCodecMultiple.h b/src/Compression/CompressionCodecMultiple.h index dded559ca0b..cd50d3250e3 100644 --- a/src/Compression/CompressionCodecMultiple.h +++ b/src/Compression/CompressionCodecMultiple.h @@ -17,6 +17,8 @@ public: UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + static std::vector getCodecsBytesFromData(const char * source); + protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; diff --git a/src/Compression/getCompressionCodecForFile.cpp b/src/Compression/getCompressionCodecForFile.cpp new file mode 100644 index 00000000000..05dd49f3939 --- /dev/null +++ b/src/Compression/getCompressionCodecForFile.cpp @@ -0,0 +1,43 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + + +using Checksum = CityHash_v1_0_2::uint128; + +CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path) +{ + auto read_buffer = disk->readFile(relative_path); + read_buffer->ignore(sizeof(Checksum)); + + UInt8 header_size = ICompressionCodec::getHeaderSize(); + PODArray compressed_buffer; + compressed_buffer.resize(header_size); + read_buffer->readStrict(compressed_buffer.data(), header_size); + uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data()); + if (method == static_cast(CompressionMethodByte::Multiple)) + { + compressed_buffer.resize(1); + read_buffer->readStrict(compressed_buffer.data(), 1); + compressed_buffer.resize(1 + compressed_buffer[0]); + read_buffer->readStrict(compressed_buffer.data() + 1, compressed_buffer[0]); + auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data()); + Codecs codecs; + for (auto byte : codecs_bytes) + codecs.push_back(CompressionCodecFactory::instance().get(byte)); + + return std::make_shared(codecs); + } + return CompressionCodecFactory::instance().get(method); +} + +} diff --git a/src/Compression/getCompressionCodecForFile.h b/src/Compression/getCompressionCodecForFile.h new file mode 100644 index 00000000000..4870de8b3b3 --- /dev/null +++ b/src/Compression/getCompressionCodecForFile.h @@ -0,0 +1,15 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/// Return compression codec with default parameters for file compressed in +/// clickhouse fashion (with checksums, headers for each block, etc). This +/// method should be used as fallback when we cannot deduce compression codec +/// from metadata. +CompressionCodecPtr getCompressionCodecForFile(const DiskPtr & disk, const String & relative_path); + +} diff --git a/src/Compression/ya.make b/src/Compression/ya.make index 82952bd1f72..8ea8419e40e 100644 --- a/src/Compression/ya.make +++ b/src/Compression/ya.make @@ -29,6 +29,7 @@ SRCS( CompressionFactory.cpp ICompressionCodec.cpp LZ4_decompress_faster.cpp + getCompressionCodecForFile.cpp ) diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index bc800a47cc1..4889e39d8e0 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -404,6 +404,16 @@ std::optional ColumnsDescription::getDefault(const String & colum } +bool ColumnsDescription::hasCompressionCodec(const String & column_name) const +{ + const auto it = columns.get<1>().find(column_name); + + if (it == columns.get<1>().end() || !it->codec) + return false; + + return true; +} + CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const { const auto it = columns.get<1>().find(column_name); diff --git a/src/Storages/ColumnsDescription.h b/src/Storages/ColumnsDescription.h index 145cadf8fb1..6e2d3299091 100644 --- a/src/Storages/ColumnsDescription.h +++ b/src/Storages/ColumnsDescription.h @@ -111,6 +111,8 @@ public: bool hasDefault(const String & column_name) const; std::optional getDefault(const String & column_name) const; + /// Does column has non default specified compression codec + bool hasCompressionCodec(const String & column_name) const; CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const; CompressionCodecPtr getCodecOrDefault(const String & column_name) const; diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 198f3c53e9b..e4bdedc17f8 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -316,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->minmax_idx.update(block, data.minmax_idx_columns); new_data_part->partition.create(metadata_snapshot, block, 0); - MergedBlockOutputStream part_out(new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, nullptr); + MergedBlockOutputStream part_out(new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); part_out.writePrefix(); part_out.write(block); part_out.writeSuffixAndFinalizePart(new_data_part); @@ -401,8 +401,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( new_data_part->is_temp = true; new_data_part->modification_time = time(nullptr); new_data_part->loadColumnsChecksumsIndexes(true, false); - if (!new_data_part->loadDefaultCompressionCodec()) - new_data_part->detectAndSetDefaultCompressionCodec(data.getTotalActiveSizeInBytes()); new_data_part->checksums.checkEqual(checksums, false); return new_data_part; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index bcdcd9e7c90..dfe970fa7bb 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include namespace DB { @@ -416,6 +418,8 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks loadTTLInfos(); if (check_consistency) checkConsistency(require_columns_checksums); + loadDefaultCompressionCodec(); + } void IMergeTreeDataPart::loadIndexGranularity() @@ -482,56 +486,73 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const return result; } -bool IMergeTreeDataPart::loadDefaultCompressionCodec() +void IMergeTreeDataPart::loadDefaultCompressionCodec() { /// In memory parts doesn't have any compression if (!isStoredOnDisk()) { default_codec = CompressionCodecFactory::instance().get("NONE", {}); - return true; + return; } String path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME; if (!volume->getDisk()->exists(path)) - return false; - - - auto file_buf = openForReading(volume->getDisk(), path); - String codec_line; - readEscapedStringUntilEOL(codec_line, *file_buf); - - ReadBufferFromString buf(codec_line); - - if (!checkString("CODEC", buf)) { - LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced automatically, from compression section in config.xml.", name, path, codec_line); - return false; + default_codec = detectDefaultCompressionCodec(); } + else + { - try - { - ParserCodec codec_parser; - auto codec_ast = parseQuery(codec_parser, codec_line.data() + buf.getPosition(), codec_line.data() + codec_line.length(), "codec parser", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); - default_codec = CompressionCodecFactory::instance().get(codec_ast, {}); - return true; - } - catch (const DB::Exception & ex) - { - LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}', error '{}'. Default compression codec will be deduced automatically, from compression section in config.xml.", name, path, codec_line, ex.what()); - return false; + auto file_buf = openForReading(volume->getDisk(), path); + String codec_line; + readEscapedStringUntilEOL(codec_line, *file_buf); + + ReadBufferFromString buf(codec_line); + + if (!checkString("CODEC", buf)) + { + LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced automatically, from data on disk", name, path, codec_line); + default_codec = detectDefaultCompressionCodec(); + } + + try + { + ParserCodec codec_parser; + auto codec_ast = parseQuery(codec_parser, codec_line.data() + buf.getPosition(), codec_line.data() + codec_line.length(), "codec parser", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + default_codec = CompressionCodecFactory::instance().get(codec_ast, {}); + } + catch (const DB::Exception & ex) + { + LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}', error '{}'. Default compression codec will be deduced automatically, from data on disk.", name, path, codec_line, ex.what()); + default_codec = detectDefaultCompressionCodec(); + } } } - -void IMergeTreeDataPart::detectAndSetDefaultCompressionCodec(size_t total_storage_size) +CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const { /// In memory parts doesn't have any compression - if (isStoredOnDisk()) - default_codec - = storage.global_context.chooseCompressionCodec(getBytesOnDisk(), - static_cast(getBytesOnDisk()) / total_storage_size); - else - default_codec = CompressionCodecFactory::instance().get("NONE", {}); + if (!isStoredOnDisk()) + return CompressionCodecFactory::instance().get("NONE", {}); + + auto metadata_snapshot = storage.getInMemoryMetadataPtr(); + + const auto & storage_columns = metadata_snapshot->getColumns(); + CompressionCodecPtr result = nullptr; + for (const auto & part_column : columns) + { + /// It was compressed with default codec + if (!storage_columns.hasCompressionCodec(part_column.name)) + { + result = getCompressionCodecForFile(volume->getDisk(), getFullRelativePath() + getFileNameForColumn(part_column) + ".bin"); + break; + } + } + + if (!result) + result = CompressionCodecFactory::instance().getDefaultCodec(); + + return result; } void IMergeTreeDataPart::loadPartitionAndMinMaxIndex() diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 163aa50698a..35e82e0e94a 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -332,19 +332,11 @@ public: String getRelativePathForPrefix(const String & prefix) const; - /// Detect default codec for part based on compression section from - /// config.xml (require total size of all parts in table) - void detectAndSetDefaultCompressionCodec(size_t total_storage_size); /// Return set of metadat file names without checksums. For example, /// columns.txt or checksums.txt itself. NameSet getFileNamesWithoutChecksums() const; - /// Load default compression codec from file default_compression_codec.txt - /// return false if load was not successful (for example file doesn't - /// exists) - bool loadDefaultCompressionCodec(); - /// File with compression codec name which was used to compress part columns /// by default. Some columns may have their own compression codecs, but /// default will be stored in this file. @@ -403,6 +395,15 @@ private: void loadTTLInfos(); void loadPartitionAndMinMaxIndex(); + + /// Load default compression codec from file default_compression_codec.txt + /// if it not exists tries to deduce codec from compressed column without + /// any specifial compression. + void loadDefaultCompressionCodec(); + + /// Found column without specific compression and return codec + /// for this column with default parameters. + CompressionCodecPtr detectDefaultCompressionCodec() const; }; using MergeTreeDataPartState = IMergeTreeDataPart::State; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index d1178ad15a5..9b9d75cc7c7 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -751,12 +751,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) DataPartsVector broken_parts_to_remove; DataPartsVector broken_parts_to_detach; - MutableDataPartsVector parts_without_default_compression; size_t suspicious_broken_parts = 0; std::atomic has_adaptive_parts = false; std::atomic has_non_adaptive_parts = false; - std::atomic total_active_parts_size = 0; ThreadPool pool(num_threads); @@ -789,13 +787,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) try { part->loadColumnsChecksumsIndexes(require_part_metadata, true); - /// If it was not successful we will try to get default - /// compression from config.xml - if (!part->loadDefaultCompressionCodec()) - { - std::lock_guard loading_lock(mutex); - parts_without_default_compression.push_back(part); - } } catch (const Exception & e) { @@ -878,7 +869,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) std::lock_guard loading_lock(mutex); if (!data_parts_indexes.insert(part).second) throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); - total_active_parts_size += part->getBytesOnDisk(); }); } @@ -897,11 +887,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); } - /// Deduce codec based on part size and total parts size with rules from - /// config.xml - for (auto & part : parts_without_default_compression) - part->detectAndSetDefaultCompressionCodec(total_active_parts_size); - if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR); @@ -2433,14 +2418,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na } -static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part, size_t total_active_parts_size) +static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part) { auto disk = part->volume->getDisk(); String full_part_path = part->getFullRelativePath(); part->loadColumnsChecksumsIndexes(false, true); - if (!part->loadDefaultCompressionCodec()) - part->detectAndSetDefaultCompressionCodec(total_active_parts_size); part->modification_time = disk->getLastModified(full_part_path).epochTime(); } @@ -2915,13 +2898,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const MutableDataPartsVector loaded_parts; loaded_parts.reserve(renamed_parts.old_and_new_names.size()); - size_t total_active_parts_size = getTotalActiveSizeInBytes(); for (const auto & part_names : renamed_parts.old_and_new_names) { LOG_DEBUG(log, "Checking part {}", part_names.second); auto single_disk_volume = std::make_shared("volume_" + part_names.first, name_to_disk[part_names.first]); MutableDataPartPtr part = createPart(part_names.first, single_disk_volume, source_dir + part_names.second); - loadPartAndFixMetadataImpl(part, total_active_parts_size); + loadPartAndFixMetadataImpl(part); loaded_parts.push_back(part); } @@ -3291,7 +3273,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk( dst_data_part->is_temp = true; dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true); - dst_data_part->default_codec = src_part->default_codec; dst_data_part->modification_time = disk->getLastModified(dst_part_path).epochTime(); return dst_data_part; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index f78fa24b93c..b78b23b2080 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -122,7 +122,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); new_part->index_granularity = writer->getIndexGranularity(); new_part->calculateColumnsSizesOnDisk(); - new_part->default_codec = default_codec; + if (default_codec != nullptr) + new_part->default_codec = default_codec; } void MergedBlockOutputStream::finalizePartOnDisk( @@ -165,10 +166,16 @@ void MergedBlockOutputStream::finalizePartOnDisk( part_columns.writeText(*out); } + if (default_codec != nullptr) { auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out); } + else + { + throw Exception("Compression codec have to be specified for part on disk, empty for" + new_part->name + + ". It is a bug.", ErrorCodes::LOGICAL_ERROR); + } { /// Write file with checksums. diff --git a/tests/integration/test_default_compression_codec/test.py b/tests/integration/test_default_compression_codec/test.py index 89c0f082a33..8ca5a1cfc1a 100644 --- a/tests/integration/test_default_compression_codec/test.py +++ b/tests/integration/test_default_compression_codec/test.py @@ -8,6 +8,7 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/default_compression.xml'], with_zookeeper=True) node2 = cluster.add_instance('node2', main_configs=['configs/default_compression.xml'], with_zookeeper=True) +node3 = cluster.add_instance('node3', main_configs=['configs/default_compression.xml'], image='yandex/clickhouse-server:20.3.16', stay_alive=True, with_installed_binary=True) @pytest.fixture(scope="module") @@ -172,3 +173,28 @@ def test_default_codec_multiple(start_cluster): assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n" assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n" + + +def test_default_codec_version_update(start_cluster): + node3.query(""" + CREATE TABLE compression_table ( + key UInt64 CODEC(LZ4HC(7)), + data1 String + ) ENGINE = MergeTree ORDER BY tuple() PARTITION BY key; + """) + + node3.query("INSERT INTO compression_table VALUES (1, 'x')") + node3.query("INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048))) + node3.query("INSERT INTO compression_table VALUES (3, '{}')".format(get_random_string(22048))) + + node3.restart_with_latest_version() + + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_0'") == "ZSTD(1)\n" + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_0'") == "ZSTD(1)\n" + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_0'") == "ZSTD(1)\n" + + node3.query("OPTIMIZE TABLE compression_table FINAL") + + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_1'") == "ZSTD(10)\n" + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'") == "LZ4HC(5)\n" + assert node3.query("SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'") == "LZ4\n" diff --git a/tests/integration/test_partition/test.py b/tests/integration/test_partition/test.py index 21b1e55f666..80fbe947316 100644 --- a/tests/integration/test_partition/test.py +++ b/tests/integration/test_partition/test.py @@ -84,6 +84,8 @@ def partition_complex_assert_checksums(): "77d5af402ada101574f4da114f242e02\tshadow/1/data/test/partition/19700201_1_1_0/columns.txt\n" \ "88cdc31ded355e7572d68d8cde525d3a\tshadow/1/data/test/partition/19700201_1_1_0/p.bin\n" \ "9e688c58a5487b8eaf69c9e1005ad0bf\tshadow/1/data/test/partition/19700102_2_2_0/primary.idx\n" \ + "c0904274faa8f3f06f35666cc9c5bd2f\tshadow/1/data/test/partition/19700102_2_2_0/default_compression_codec.txt\n" \ + "c0904274faa8f3f06f35666cc9c5bd2f\tshadow/1/data/test/partition/19700201_1_1_0/default_compression_codec.txt\n" \ "c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition/19700102_2_2_0/count.txt\n" \ "c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition/19700201_1_1_0/count.txt\n" \ "cfcb770c3ecd0990dcceb1bde129e6c6\tshadow/1/data/test/partition/19700102_2_2_0/p.bin\n" \