different error codes for internal and external data

This commit is contained in:
Alexander Tokmakov 2023-11-04 14:18:49 +01:00
parent 289fcccbfc
commit 0b0b1b21c6
6 changed files with 49 additions and 32 deletions

View File

@ -16,8 +16,8 @@ private:
bool nextImpl() override;
public:
explicit CompressedReadBuffer(ReadBuffer & in_, bool allow_different_codecs_ = false)
: CompressedReadBufferBase(&in_, allow_different_codecs_), BufferWithOwnMemory<ReadBuffer>(0)
explicit CompressedReadBuffer(ReadBuffer & in_, bool allow_different_codecs_ = false, bool external_data_ = false)
: CompressedReadBufferBase(&in_, allow_different_codecs_, external_data_), BufferWithOwnMemory<ReadBuffer>(0)
{
}

View File

@ -114,7 +114,8 @@ static void readHeaderAndGetCodecAndSize(
CompressionCodecPtr & codec,
size_t & size_decompressed,
size_t & size_compressed_without_checksum,
bool allow_different_codecs)
bool allow_different_codecs,
bool external_data)
{
uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
@ -136,8 +137,11 @@ static void readHeaderAndGetCodecAndSize(
}
}
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(compressed_buffer);
size_decompressed = ICompressionCodec::readDecompressedBlockSize(compressed_buffer);
if (external_data)
codec->setExternalDataFlag();
size_compressed_without_checksum = codec->readCompressedBlockSize(compressed_buffer);
size_decompressed = codec->readDecompressedBlockSize(compressed_buffer);
/// This is for clang static analyzer.
assert(size_decompressed > 0);
@ -170,7 +174,8 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
codec,
size_decompressed,
size_compressed_without_checksum,
allow_different_codecs);
allow_different_codecs,
external_data);
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
@ -221,7 +226,8 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t &
codec,
size_decompressed,
size_compressed_without_checksum,
allow_different_codecs);
allow_different_codecs,
external_data);
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
@ -254,7 +260,8 @@ size_t CompressedReadBufferBase::readCompressedDataBlockForAsynchronous(size_t &
}
}
static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec, bool allow_different_codecs)
static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec,
bool allow_different_codecs, bool external_data)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
@ -278,17 +285,20 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_de
getHexUIntLowercase(method), getHexUIntLowercase(codec->getMethodByte()));
}
}
if (external_data)
codec->setExternalDataFlag();
}
void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs, external_data);
codec->decompress(compressed_buffer, static_cast<UInt32>(size_compressed_without_checksum), to);
}
void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs, external_data);
if (codec->isNone())
{
@ -320,8 +330,8 @@ void CompressedReadBufferBase::setDecompressMode(ICompressionCodec::CodecMode mo
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_, bool external_data_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_), external_data(external_data_)
{
}

View File

@ -30,6 +30,9 @@ protected:
/// Allow reading data, compressed by different codecs from one file.
bool allow_different_codecs;
/// Report decompression errors as CANNOT_DECOMPRESS, not CORRUPTED_DATA
bool external_data;
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
///
/// If always_copy is true then even if the compressed block is already stored in compressed_in.buffer()
@ -67,7 +70,7 @@ protected:
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
explicit CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);
explicit CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false, bool external_data_ = false);
virtual ~CompressedReadBufferBase();
/** Disable checksums.

View File

@ -14,12 +14,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_DECOMPRESS;
}
CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_)
: codecs(codecs_)
{
@ -79,7 +73,7 @@ UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 sour
void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const
{
if (source_size < 1 || !source[0])
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Wrong compression methods list");
throw Exception(decompression_error_code, "Wrong compression methods list");
UInt8 compression_methods_size = source[0];
@ -95,10 +89,10 @@ void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 sour
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
compressed_buf.resize(compressed_buf.size() + additional_size_at_the_end_of_buffer);
UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data());
UInt32 uncompressed_size = readDecompressedBlockSize(compressed_buf.data());
if (idx == 0 && uncompressed_size != decompressed_size)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Wrong final decompressed size in codec Multiple, got {}, expected {}",
throw Exception(decompression_error_code, "Wrong final decompressed size in codec Multiple, got {}, expected {}",
uncompressed_size, decompressed_size);
uncompressed_buf.resize(uncompressed_size + additional_size_at_the_end_of_buffer);

View File

@ -15,7 +15,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_DECOMPRESS;
extern const int LOGICAL_ERROR;
}
@ -96,14 +95,14 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
UInt8 header_size = getHeaderSize();
if (source_size < header_size)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS,
throw Exception(decompression_error_code,
"Can't decompress data: the compressed data size ({}, this should include header size) "
"is less than the header size ({})", source_size, static_cast<size_t>(header_size));
uint8_t our_method = getMethodByte();
uint8_t method = source[0];
if (method != our_method)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Can't decompress data with codec byte {} using codec with byte {}", method, our_method);
throw Exception(decompression_error_code, "Can't decompress data with codec byte {} using codec with byte {}", method, our_method);
UInt32 decompressed_size = readDecompressedBlockSize(source);
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
@ -111,20 +110,20 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
return decompressed_size;
}
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source)
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source) const
{
UInt32 compressed_block_size = unalignedLoadLittleEndian<UInt32>(&source[1]);
if (compressed_block_size == 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Can't decompress data: header is corrupt with compressed block size 0");
throw Exception(decompression_error_code, "Can't decompress data: header is corrupt with compressed block size 0");
return compressed_block_size;
}
UInt32 ICompressionCodec::readDecompressedBlockSize(const char * source)
UInt32 ICompressionCodec::readDecompressedBlockSize(const char * source) const
{
UInt32 decompressed_block_size = unalignedLoadLittleEndian<UInt32>(&source[5]);
if (decompressed_block_size == 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Can't decompress data: header is corrupt with decompressed block size 0");
throw Exception(decompression_error_code, "Can't decompress data: header is corrupt with decompressed block size 0");
return decompressed_block_size;
}

View File

@ -13,6 +13,12 @@ namespace DB
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size);
namespace ErrorCodes
{
extern const int CANNOT_DECOMPRESS;
extern const int CORRUPTED_DATA;
}
/**
* Represents interface for compression codecs like LZ4, ZSTD, etc.
*/
@ -59,7 +65,10 @@ public:
CodecMode getDecompressMode() const{ return decompressMode; }
/// if set mode to CodecMode::Asynchronous, must be followed with flushAsynchronousDecompressRequests
void setDecompressMode(CodecMode mode){ decompressMode = mode; }
void setDecompressMode(CodecMode mode) { decompressMode = mode; }
/// Report decompression errors as CANNOT_DECOMPRESS, not CORRUPTED_DATA
void setExternalDataFlag() { decompression_error_code = ErrorCodes::CANNOT_DECOMPRESS; }
/// Flush result for previous asynchronous decompression requests.
/// This function must be called following several requests offload to HW.
@ -82,10 +91,10 @@ public:
static constexpr UInt8 getHeaderSize() { return COMPRESSED_BLOCK_HEADER_SIZE; }
/// Read size of compressed block from compressed source
static UInt32 readCompressedBlockSize(const char * source);
UInt32 readCompressedBlockSize(const char * source) const;
/// Read size of decompressed block from compressed source
static UInt32 readDecompressedBlockSize(const char * source);
UInt32 readDecompressedBlockSize(const char * source) const;
/// Read method byte from compressed source
static uint8_t readMethod(const char * source);
@ -131,6 +140,8 @@ protected:
/// Construct and set codec description from codec name and arguments. Must be called in codec constructor.
void setCodecDescription(const String & name, const ASTs & arguments = {});
int decompression_error_code = ErrorCodes::CORRUPTED_DATA;
private:
ASTPtr full_codec_desc;
CodecMode decompressMode{CodecMode::Synchronous};