From 22ceaa00669d19c5f353b4d4b496244ccbeb25ba Mon Sep 17 00:00:00 2001 From: Alex Katsman Date: Wed, 4 Sep 2024 16:50:22 +0000 Subject: [PATCH 1/2] Use another error code for external data in buffer Use CANNOT_DECOMPRESS error code for the compressed external data in CompressedReadBufferBase for checksum validation and decompression. --- src/Compression/CompressedReadBufferBase.cpp | 18 ++++++++++-------- src/Server/TCPHandler.cpp | 4 ++-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Compression/CompressedReadBufferBase.cpp b/src/Compression/CompressedReadBufferBase.cpp index e416fadc829..907e87a6d30 100644 --- a/src/Compression/CompressedReadBufferBase.cpp +++ b/src/Compression/CompressedReadBufferBase.cpp @@ -39,7 +39,7 @@ using Checksum = CityHash_v1_0_2::uint128; /// Validate checksum of data, and if it mismatches, find out possible reason and throw exception. -static void validateChecksum(char * data, size_t size, const Checksum expected_checksum) +static void validateChecksum(char * data, size_t size, const Checksum expected_checksum, bool external_data) { auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size); if (expected_checksum == calculated_checksum) @@ -64,6 +64,8 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c "this can be caused by disk bit rot. This exception protects ClickHouse " "from data corruption due to hardware failures."; + int error_code = external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CHECKSUM_DOESNT_MATCH; + auto flip_bit = [](char * buf, size_t pos) { buf[pos / 8] ^= 1 << pos % 8; @@ -87,7 +89,7 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c { message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". " << message_hardware_failure; - throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH); + throw Exception::createDeprecated(message.str(), error_code); } flip_bit(tmp_data, bit_pos); /// Restore @@ -102,10 +104,10 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c { message << ". The mismatch is caused by single bit flip in checksum. " << message_hardware_failure; - throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH); + throw Exception::createDeprecated(message.str(), error_code); } - throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH); + throw Exception::createDeprecated(message.str(), error_code); } static void readHeaderAndGetCodecAndSize( @@ -151,7 +153,7 @@ static void readHeaderAndGetCodecAndSize( "Most likely corrupted data.", size_compressed_without_checksum); if (size_compressed_without_checksum < header_size) - throw Exception(ErrorCodes::CORRUPTED_DATA, "Can't decompress data: " + throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA, "Can't decompress data: " "the compressed data size ({}, this should include header size) is less than the header size ({})", size_compressed_without_checksum, static_cast(header_size)); } @@ -202,7 +204,7 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, readBinaryLittleEndian(checksum.low64, checksum_in); readBinaryLittleEndian(checksum.high64, checksum_in); - validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum); + validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data); } ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum)); @@ -247,7 +249,7 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t & readBinaryLittleEndian(checksum.low64, checksum_in); readBinaryLittleEndian(checksum.high64, checksum_in); - validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum); + validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data); } ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum)); @@ -307,7 +309,7 @@ void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_d UInt8 header_size = ICompressionCodec::getHeaderSize(); if (size_compressed_without_checksum < header_size) - throw Exception(ErrorCodes::CORRUPTED_DATA, + throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA, "Can't decompress data: the compressed data size ({}, this should include header size) is less than the header size ({})", size_compressed_without_checksum, static_cast(header_size)); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 2b9a7295198..8083f7da24d 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -2133,7 +2133,7 @@ bool TCPHandler::receiveUnexpectedData(bool throw_exception) std::shared_ptr maybe_compressed_in; if (last_block_in.compression == Protocol::Compression::Enable) - maybe_compressed_in = std::make_shared(*in, /* allow_different_codecs */ true); + maybe_compressed_in = std::make_shared(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY); else maybe_compressed_in = in; @@ -2157,7 +2157,7 @@ void TCPHandler::initBlockInput() /// with another codec that the rest of the data. Example: data sent by Distributed tables. if (state.compression == Protocol::Compression::Enable) - state.maybe_compressed_in = std::make_shared(*in, /* allow_different_codecs */ true); + state.maybe_compressed_in = std::make_shared(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY); else state.maybe_compressed_in = in; From 9a2a6d71d2ec577ae3787a3f63cb1c3ee7b37943 Mon Sep 17 00:00:00 2001 From: Alex Katsman Date: Wed, 11 Sep 2024 15:09:03 +0000 Subject: [PATCH 2/2] Fix check in test_broken_projections --- .../test_broken_projections/test.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 578ff42369c..d161f3fba78 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -735,11 +735,14 @@ def test_mutation_with_broken_projection(cluster): f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1" ) + parts = get_parts(node, table_name) # All parts changes because this is how alter delete works, # but all parts apart from the first have only hardlinks to files in previous part. - assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts( - node, table_name - ) or ["all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(node, table_name) + assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == parts or [ + "all_1_1_0_5", + "all_2_2_0_5", + "all_3_3_0_5", + ] == parts # Still broken because it was hardlinked. broken = get_broken_projections_info(node, table_name) @@ -752,11 +755,13 @@ def test_mutation_with_broken_projection(cluster): f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1" ) - assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts( - node, table_name - ) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts( - node, table_name - ) + parts = get_parts(node, table_name) + assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == parts or [ + "all_0_0_0_6", + "all_1_1_0_6", + "all_2_2_0_6", + "all_3_3_0_6", + ] == parts # Not broken anymore. assert not get_broken_projections_info(node, table_name)