Merge pull request #69269 from alexkats/external-data-checksum

Use another error code for external data in buffer
This commit is contained in:
Alexey Katsman 2024-09-12 09:38:09 +00:00 committed by GitHub
commit d7f271c658
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 25 additions and 18 deletions

View File

@ -39,7 +39,7 @@ using Checksum = CityHash_v1_0_2::uint128;
/// Validate checksum of data, and if it mismatches, find out possible reason and throw exception.
static void validateChecksum(char * data, size_t size, const Checksum expected_checksum)
static void validateChecksum(char * data, size_t size, const Checksum expected_checksum, bool external_data)
{
auto calculated_checksum = CityHash_v1_0_2::CityHash128(data, size);
if (expected_checksum == calculated_checksum)
@ -64,6 +64,8 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
"this can be caused by disk bit rot. This exception protects ClickHouse "
"from data corruption due to hardware failures.";
int error_code = external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CHECKSUM_DOESNT_MATCH;
auto flip_bit = [](char * buf, size_t pos)
{
buf[pos / 8] ^= 1 << pos % 8;
@ -87,7 +89,7 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
{
message << ". The mismatch is caused by single bit flip in data block at byte " << (bit_pos / 8) << ", bit " << (bit_pos % 8) << ". "
<< message_hardware_failure;
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
flip_bit(tmp_data, bit_pos); /// Restore
@ -102,10 +104,10 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c
{
message << ". The mismatch is caused by single bit flip in checksum. "
<< message_hardware_failure;
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
throw Exception::createDeprecated(message.str(), ErrorCodes::CHECKSUM_DOESNT_MATCH);
throw Exception::createDeprecated(message.str(), error_code);
}
static void readHeaderAndGetCodecAndSize(
@ -151,7 +153,7 @@ static void readHeaderAndGetCodecAndSize(
"Most likely corrupted data.", size_compressed_without_checksum);
if (size_compressed_without_checksum < header_size)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA, "Can't decompress data: "
"the compressed data size ({}, this should include header size) is less than the header size ({})",
size_compressed_without_checksum, static_cast<size_t>(header_size));
}
@ -202,7 +204,7 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
readBinaryLittleEndian(checksum.low64, checksum_in);
readBinaryLittleEndian(checksum.high64, checksum_in);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
@ -247,7 +249,7 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t &
readBinaryLittleEndian(checksum.low64, checksum_in);
readBinaryLittleEndian(checksum.high64, checksum_in);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum);
validateChecksum(compressed_buffer, size_compressed_without_checksum, checksum, external_data);
}
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + sizeof(Checksum));
@ -307,7 +309,7 @@ void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_d
UInt8 header_size = ICompressionCodec::getHeaderSize();
if (size_compressed_without_checksum < header_size)
throw Exception(ErrorCodes::CORRUPTED_DATA,
throw Exception(external_data ? ErrorCodes::CANNOT_DECOMPRESS : ErrorCodes::CORRUPTED_DATA,
"Can't decompress data: the compressed data size ({}, this should include header size) is less than the header size ({})",
size_compressed_without_checksum, static_cast<size_t>(header_size));

View File

@ -2133,7 +2133,7 @@ bool TCPHandler::receiveUnexpectedData(bool throw_exception)
std::shared_ptr<ReadBuffer> maybe_compressed_in;
if (last_block_in.compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY);
else
maybe_compressed_in = in;
@ -2157,7 +2157,7 @@ void TCPHandler::initBlockInput()
/// with another codec that the rest of the data. Example: data sent by Distributed tables.
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true, /* external_data */ query_kind != ClientInfo::QueryKind::SECONDARY_QUERY);
else
state.maybe_compressed_in = in;

View File

@ -735,11 +735,14 @@ def test_mutation_with_broken_projection(cluster):
f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1"
)
parts = get_parts(node, table_name)
# All parts changes because this is how alter delete works,
# but all parts apart from the first have only hardlinks to files in previous part.
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(
node, table_name
) or ["all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(node, table_name)
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == parts or [
"all_1_1_0_5",
"all_2_2_0_5",
"all_3_3_0_5",
] == parts
# Still broken because it was hardlinked.
broken = get_broken_projections_info(node, table_name)
@ -752,11 +755,13 @@ def test_mutation_with_broken_projection(cluster):
f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1"
)
assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
)
parts = get_parts(node, table_name)
assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == parts or [
"all_0_0_0_6",
"all_1_1_0_6",
"all_2_2_0_6",
"all_3_3_0_6",
] == parts
# Not broken anymore.
assert not get_broken_projections_info(node, table_name)