add CodecMode for deflate codec

This commit is contained in:
jinjunzh 2022-07-06 18:18:55 -04:00
parent 65a91864b3
commit 567f90d8a4
7 changed files with 69 additions and 75 deletions

View File

@ -290,14 +290,14 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_de
}
void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum, UInt8 req_type)
void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to, req_type);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum, UInt8 req_type)
void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
@ -315,7 +315,7 @@ void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_d
to = BufferBase::Buffer(compressed_buffer + header_size, compressed_buffer + size_compressed_without_checksum);
}
else
codec->decompress(compressed_buffer, size_compressed_without_checksum, to.begin(), req_type);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to.begin());
}
void CompressedReadBufferBase::decompressFlush() const
@ -326,6 +326,13 @@ void CompressedReadBufferBase::decompressFlush() const
}
}
void CompressedReadBufferBase::setDecompressMode(ICompressionCodec::CodecMode mode)
{
if (codec)
{
codec->setDecompressMode(mode);
}
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)

View File

@ -40,13 +40,15 @@ protected:
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy);
size_t readCompressedDataBlockHold(size_t & size_decompressed, size_t & size_compressed_without_checksum);
/// Decompress into memory pointed by `to`
void decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum, UInt8 req_type=0);
void decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
/// This method can change location of `to` to avoid unnecessary copy if data is uncompressed.
/// It is more efficient for compression codec NONE but not suitable if you want to decompress into specific location.
void decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum, UInt8 req_type=0);
void decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum);
/// Flush all asynchronous decompress request
void decompressFlush() const;
void setDecompressMode(ICompressionCodec::CodecMode mode);
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
explicit CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);

View File

@ -91,7 +91,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t
size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_t bytes_read = 0;
UInt8 req_type = 0;
ICompressionCodec::CodecMode decompress_mode = ICompressionCodec::CodecMode::Synchronous;
bool read_tail = false;
/// If there are unread bytes in the buffer, then we copy needed to `to`.
@ -108,13 +108,13 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
if (new_size_compressed)
{
req_type = 1;
decompress_mode = ICompressionCodec::CodecMode::Asynchronous;
}
else
{
decompressFlush(); /// here switch to unhold block in compress_in, we must flush for previous blocks completely hold in compress_in
new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum, false);
req_type = 0;
decompress_mode = ICompressionCodec::CodecMode::Synchronous;
}
size_compressed = 0; /// file_in no longer points to the end of the block in working_buffer.
@ -127,7 +127,8 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum, req_type); //Async req
setDecompressMode(decompress_mode);
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
@ -141,6 +142,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
setDecompressMode(ICompressionCodec::CodecMode::Synchronous);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
/// Read partial data from first block. Won't run here at second block.
@ -160,8 +162,8 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum, 1);
setDecompressMode(ICompressionCodec::CodecMode::Asynchronous);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
read_tail = true;
break;
}

View File

@ -291,11 +291,6 @@ uint8_t CompressionCodecDeflate::getMethodByte() const
return static_cast<uint8_t>(CompressionMethodByte::Deflate);
}
bool CompressionCodecDeflate::isAsyncSupported() const
{
return hwCodec->hwEnabled;
}
void CompressionCodecDeflate::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
@ -324,25 +319,30 @@ uint32_t CompressionCodecDeflate::doCompressData(const char * source, uint32_t s
void CompressionCodecDeflate::doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
{
uint32_t res = 0;
if (hwCodec->hwEnabled)
res = hwCodec->doDecompressData(source, source_size, dest, uncompressed_size);
if (0 == res)
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
}
void CompressionCodecDeflate::doDecompressDataSW(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const
{
return swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
}
void CompressionCodecDeflate::doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size)
{
uint32_t res = 0;
if (hwCodec->hwEnabled)
res = hwCodec->doDecompressDataReq(source, source_size, dest, uncompressed_size);
if (0 == res)
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
switch (getDecompressMode())
{
case CodecMode::Synchronous:
{
uint32_t res = 0;
if (hwCodec->hwEnabled)
res = hwCodec->doDecompressData(source, source_size, dest, uncompressed_size);
if (0 == res)
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
}
case CodecMode::Asynchronous:
{
uint32_t res = 0;
if (hwCodec->hwEnabled)
res = hwCodec->doDecompressDataReq(source, source_size, dest, uncompressed_size);
if (0 == res)
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
}
case CodecMode::SoftwareFallback:
swCodec->doDecompressData(source, source_size, dest, uncompressed_size);
break;
}
}
void CompressionCodecDeflate::doDecompressDataFlush()

View File

@ -218,7 +218,6 @@ public:
//~CompressionCodecDeflate() ;
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
bool isAsyncSupported() const override;
protected:
bool isCompression() const override
@ -232,8 +231,6 @@ protected:
uint32_t doCompressData(const char * source, uint32_t source_size, char * dest) const override;
uint32_t doCompressDataSW(const char * source, uint32_t source_size, char * dest) const;
void doDecompressData(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const override;
void doDecompressDataReq(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) override;
void doDecompressDataSW(const char * source, uint32_t source_size, char * dest, uint32_t uncompressed_size) const override;
void doDecompressDataFlush() override;
private:

View File

@ -91,7 +91,7 @@ UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char
return header_size + compressed_bytes_written;
}
UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, char * dest, UInt8 req_type)
UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, char * dest)
{
assert(source != nullptr && dest != nullptr);
@ -105,18 +105,7 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Can't decompress data with codec byte {} using codec with byte {}", method, our_method);
UInt32 decompressed_size = readDecompressedBlockSize(source);
switch (req_type)
{
case 0:
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
break;
case 1:
doDecompressDataReq(&source[header_size], source_size - header_size, dest, decompressed_size);
break;
case 2:
doDecompressDataSW(&source[header_size], source_size - header_size, dest, decompressed_size);
break;
}
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
return decompressed_size;
}
@ -124,6 +113,7 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
void ICompressionCodec::decompressFlush()
{
doDecompressDataFlush();
decompressMode = CodecMode::Synchronous;
}
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source)

View File

@ -26,6 +26,13 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size);
class ICompressionCodec : private boost::noncopyable
{
public:
enum class CodecMode
{
Synchronous, //synchronous request by default;
Asynchronous, //asynchronous request, must be used in pair with decompressFlush;
SoftwareFallback //Fallback to SW decompressor;
};
virtual ~ICompressionCodec() = default;
/// Byte which indicates codec in compressed file
@ -46,21 +53,21 @@ public:
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;
/// Decompress bytes from compressed source to dest. Dest should preallocate memory;
// reqType is specific for HW decompressor:
//0 means synchronous request by default;
//1 means asynchronous request, must be used in pair with decompressFlush;
//2 means SW decompressor instead of HW
UInt32 decompress(const char * source, UInt32 source_size, char * dest, UInt8 req_type = 0);
UInt32 decompress(const char * source, UInt32 source_size, char * dest);
CodecMode getDecompressMode() const
{
return decompressMode;
}
void setDecompressMode(CodecMode mode)
{
decompressMode = mode;
}
/// Flush all asynchronous request for decompression
void decompressFlush(void);
/// Some codecs (QPL_deflate, for example) support asynchronous request
virtual bool isAsyncSupported() const
{
return false;
}
/// Number of bytes, that will be used to compress uncompressed_size bytes with current codec
virtual UInt32 getCompressedReserveSize(UInt32 uncompressed_size) const
{
@ -111,18 +118,6 @@ protected:
/// Actually decompress data without header
virtual void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const = 0;
/// Asynchronous decompression request to HW decompressor
virtual void doDecompressDataReq(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
{
doDecompressData(source, source_size, dest, uncompressed_size);
}
/// SW decompressor instead of HW
virtual void doDecompressDataSW(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
doDecompressData(source, source_size, dest, uncompressed_size);
}
/// Flush asynchronous request for decompression
virtual void doDecompressDataFlush()
{
@ -132,6 +127,7 @@ protected:
private:
ASTPtr full_codec_desc;
CodecMode decompressMode{CodecMode::Synchronous};
};
}